Skip to content

fetch commit offsets in base consumer unless group is None #356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import kafka.common
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
UnknownTopicOrPartitionError
UnknownTopicOrPartitionError, check_error
)

from kafka.util import ReentrantTimer
Expand Down Expand Up @@ -68,29 +68,43 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
self.commit)
self.commit_timer.start()

if auto_commit:
# Set initial offsets
if self.group is not None:
self.fetch_last_known_offsets(partitions)
else:
for partition in partitions:
self.offsets[partition] = 0


def fetch_last_known_offsets(self, partitions=None):
if self.group is None:
raise ValueError('KafkaClient.group must not be None')

if not partitions:
partitions = self.client.get_partition_ids_for_topic(self.topic)

def get_or_init_offset(resp):
responses = self.client.send_offset_fetch_request(
self.group,
[OffsetFetchRequest(self.topic, p) for p in partitions],
fail_on_error=False
)

for resp in responses:
try:
kafka.common.check_error(resp)
return resp.offset
check_error(resp)
# API spec says server wont set an error here
# but 0.8.1.1 does actually...
except UnknownTopicOrPartitionError:
return 0
pass

for partition in partitions:
req = OffsetFetchRequest(self.topic, partition)
(resp,) = self.client.send_offset_fetch_request(self.group, [req],
fail_on_error=False)
self.offsets[partition] = get_or_init_offset(resp)
self.fetch_offsets = self.offsets.copy()
# -1 offset signals no commit is currently stored
if resp.offset == -1:
self.offsets[resp.partition] = 0

# Otherwise we committed the stored offset
# and need to fetch the next one
else:
self.offsets[resp.partition] = resp.offset

def commit(self, partitions=None):
"""
Expand Down
2 changes: 2 additions & 0 deletions kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class MultiProcessConsumer(Consumer):
Arguments:
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
topic: the topic to consume

Keyword Arguments:
Expand Down
2 changes: 2 additions & 0 deletions kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class SimpleConsumer(Consumer):
Arguments:
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
topic: the topic to consume

Keyword Arguments:
Expand Down
75 changes: 74 additions & 1 deletion test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def assert_message_count(self, messages, num_messages):
def consumer(self, **kwargs):
if os.environ['KAFKA_VERSION'] == "0.8.0":
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
kwargs['group'] = None
kwargs['auto_commit'] = False
else:
kwargs.setdefault('auto_commit', True)
Expand Down Expand Up @@ -127,6 +128,23 @@ def test_simple_consumer_no_reset(self):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()

@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
def test_simple_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))

# Create 1st consumer and change offsets
consumer = self.consumer()
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
consumer.offsets.update({0:51, 1:101})
# Update counter after manual offsets update
consumer.count_since_commit += 1
consumer.commit()

# Create 2nd consumer and check initial offsets
consumer = self.consumer(auto_commit=False)
self.assertEqual(consumer.offsets, {0: 51, 1: 101})

@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
Expand Down Expand Up @@ -243,7 +261,9 @@ def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))

consumer = MultiProcessConsumer(self.client, "group1", self.topic,
# set group to None and auto_commit to False to avoid interactions w/
# offset commit/fetch apis
consumer = MultiProcessConsumer(self.client, None, self.topic,
auto_commit=False, iter_timeout=0)

self.assertEqual(consumer.pending(), 20)
Expand All @@ -252,6 +272,24 @@ def test_multi_proc_pending(self):

consumer.stop()

@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
def test_multi_process_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))

# Create 1st consumer and change offsets
consumer = self.consumer()
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
consumer.offsets.update({0:5, 1:15})
# Update counter after manual offsets update
consumer.count_since_commit += 1
consumer.commit()

# Create 2nd consumer and check initial offsets
consumer = self.consumer(consumer = MultiProcessConsumer,
auto_commit=False)
self.assertEqual(consumer.offsets, {0: 5, 1: 15})

@kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
Expand Down Expand Up @@ -327,6 +365,41 @@ def test_offset_behavior__resuming_behavior(self):
consumer1.stop()
consumer2.stop()

@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
def test_multi_process_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))

# Start a consumer
consumer1 = self.consumer(
consumer=MultiProcessConsumer,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)

# Grab the first 195 messages
output_msgs1 = []
idx = 0
for message in consumer1:
output_msgs1.append(message.message.value)
idx += 1
if idx >= 195:
break
self.assert_message_count(output_msgs1, 195)

# The total offset across both partitions should be at 180
consumer2 = self.consumer(
consumer=MultiProcessConsumer,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)

# 181-200
self.assert_message_count([ message for message in consumer2 ], 20)

consumer1.stop()
consumer2.stop()

# TODO: Make this a unit test -- should not require integration
@kafka_versions("all")
def test_fetch_buffer_size(self):
Expand Down
2 changes: 1 addition & 1 deletion test/test_failover_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def assert_message_count(self, topic, check_count, timeout=10, partitions=None):

client = KafkaClient(hosts)
group = random_string(10)
consumer = SimpleConsumer(client, group, topic,
consumer = SimpleConsumer(client, None, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)
Expand Down