Skip to content

Commit f67ad27

Browse files
committed
Auto-adjusting consumer fetch size
Related to dpkp#42 Adds new ConsumerFetchSizeTooSmall exception that is thrown when `_decode_message_set_iter` gets a BufferUnderflowError but has not yet yielded a message In this event, SimpleConsumer will increase the fetch size by 1.5 and continue the fetching loop while _not_ increasing the offset (basically just retries the request with a larger fetch size) Once the consumer fetch size has been increased, it will remain increased while SimpleConsumer fetches from that partition
1 parent 40d8e9e commit f67ad27

File tree

4 files changed

+43
-33
lines changed

4 files changed

+43
-33
lines changed

kafka/consumer.py

+25-15
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
)
1313

1414
from kafka.util import (
15-
ReentrantTimer
15+
ReentrantTimer, ConsumerFetchSizeTooSmall
1616
)
1717

1818
log = logging.getLogger("kafka")
@@ -357,29 +357,39 @@ def __iter_partition__(self, partition, offset):
357357
if self.fetch_started[partition]:
358358
offset += 1
359359

360+
fetch_size = self.fetch_min_bytes
361+
360362
while True:
361-
req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes)
363+
req = FetchRequest(self.topic, partition, offset, fetch_size)
362364

363365
(resp,) = self.client.send_fetch_request([req],
364366
max_wait_time=self.fetch_max_wait_time,
365-
min_bytes=self.fetch_min_bytes)
367+
min_bytes=fetch_size)
366368

367369
assert resp.topic == self.topic
368370
assert resp.partition == partition
369371

370372
next_offset = None
371-
for message in resp.messages:
372-
next_offset = message.offset
373-
374-
# update the offset before the message is yielded. This is
375-
# so that the consumer state is not lost in certain cases.
376-
# For eg: the message is yielded and consumed by the caller,
377-
# but the caller does not come back into the generator again.
378-
# The message will be consumed but the status will not be
379-
# updated in the consumer
380-
self.fetch_started[partition] = True
381-
self.offsets[partition] = message.offset
382-
yield message
373+
try:
374+
for message in resp.messages:
375+
next_offset = message.offset
376+
377+
# update the offset before the message is yielded. This is
378+
# so that the consumer state is not lost in certain cases.
379+
# For eg: the message is yielded and consumed by the caller,
380+
# but the caller does not come back into the generator again.
381+
# The message will be consumed but the status will not be
382+
# updated in the consumer
383+
self.fetch_started[partition] = True
384+
self.offsets[partition] = message.offset
385+
yield message
386+
except ConsumerFetchSizeTooSmall, e:
387+
log.warn("Fetch size is too small, increasing by 1.5x and retrying")
388+
fetch_size *= 1.5
389+
continue
390+
except ConsumerNoMoreData, e:
391+
log.debug("Iteration was ended by %r", e)
392+
383393
if next_offset is None:
384394
break
385395
else:

kafka/protocol.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from kafka.util import (
1414
read_short_string, read_int_string, relative_unpack,
1515
write_short_string, write_int_string, group_by_topic_and_partition,
16-
BufferUnderflowError, ChecksumError
16+
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
1717
)
1818

1919
log = logging.getLogger("kafka")
@@ -110,17 +110,21 @@ def _decode_message_set_iter(cls, data):
110110
recurse easily.
111111
"""
112112
cur = 0
113+
read_message = False
113114
while cur < len(data):
114115
try:
115116
((offset, ), cur) = relative_unpack('>q', data, cur)
116117
(msg, cur) = read_int_string(data, cur)
117-
for (offset, message) in KafkaProtocol._decode_message(msg,
118-
offset):
118+
for (offset, message) in KafkaProtocol._decode_message(msg, offset):
119+
read_message = True
119120
yield OffsetAndMessage(offset, message)
120-
121121
except BufferUnderflowError:
122-
# If we get a partial read of a message, stop
123-
raise StopIteration()
122+
if read_message is False:
123+
# If we get a partial read of a message, but haven't yielded anyhting
124+
# there's a problem
125+
raise ConsumerFetchSizeTooSmall()
126+
else:
127+
raise StopIteration()
124128

125129
@classmethod
126130
def _decode_message(cls, data, offset):

kafka/util.py

+2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class BufferUnderflowError(Exception):
7373
class ChecksumError(Exception):
7474
pass
7575

76+
class ConsumerFetchSizeTooSmall(Exception):
77+
pass
7678

7779
class ReentrantTimer(object):
7880
"""

test/test_integration.py

+6-12
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from kafka import * # noqa
99
from kafka.common import * # noqa
1010
from kafka.codec import has_gzip, has_snappy
11-
1211
from .fixtures import ZookeeperFixture, KafkaFixture
1312

1413

@@ -757,20 +756,15 @@ def test_large_messages(self):
757756
self.assertEquals(resp.error, 0)
758757
self.assertEquals(resp.offset, 10)
759758

759+
# Consumer should still get all of them
760760
consumer = SimpleConsumer(self.client, "group1", "test_large_messages")
761-
it = consumer.__iter__()
762-
for i in range(10):
763-
self.assertEquals(messages1[i], it.next().message)
764-
765-
consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120)
766-
it = consumer.__iter__()
767-
for i in range(10):
768-
self.assertEquals(messages1[i], it.next().message)
769-
for i in range(10):
770-
self.assertEquals(messages2[i], it.next().message)
761+
all_messages = messages1 + messages2
762+
for i, message in enumerate(consumer):
763+
self.assertEquals(all_messages[i], message.message)
764+
self.assertEquals(i, 19)
771765

772766
def random_string(l):
773-
s = "".join(random.choice(string.printable) for i in xrange(l))
767+
s = "".join(random.choice(string.letters) for i in xrange(l))
774768
return s
775769

776770
if __name__ == "__main__":

0 commit comments

Comments
 (0)