Skip to content

Use Fetch/Produce API v2 for brokers >= 0.10 #694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,12 @@ def _create_fetch_requests(self):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)

version = 1 if self.config['api_version'] >= (0, 9) else 0
if self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
else:
version = 0
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest[version](
Expand Down
6 changes: 6 additions & 0 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ class ClusterAuthorizationFailedError(BrokerResponseError):
' use an inter-broker or administrative API.')


class InvalidTimestampError(BrokerResponseError):
errno = 32
message = 'INVALID_TIMESTAMP'
description = ('The timestamp of the message is out of acceptable range.')


class KafkaUnavailableError(KafkaError):
pass

Expand Down
3 changes: 2 additions & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ def __init__(self, **configs):
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'

self._accumulator = RecordAccumulator(**self.config)
message_version = 1 if self.config['api_version'] >= (0, 10) else 0
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
self._sender = Sender(client, self._metadata, self._accumulator,
**self.config)
Expand Down
16 changes: 13 additions & 3 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,16 @@ def _handle_produce_response(self, batches, response):
for batch in batches])

for topic, partitions in response.topics:
for partition, error_code, offset in partitions:
for partition_info in partitions:
if response.API_VERSION < 2:
partition, error_code, offset = partition_info
ts = None
else:
partition, error_code, offset, ts = partition_info
tp = TopicPartition(topic, partition)
error = Errors.for_code(error_code)
batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset)
self._complete_batch(batch, error, offset, ts)

else:
# this is the acks = 0 case, just complete all requests
Expand Down Expand Up @@ -258,7 +263,12 @@ def _produce_request(self, node_id, acks, timeout, batches):
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf

version = 1 if self.config['api_version'] >= (0, 9) else 0
if self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
else:
version = 0
return ProduceRequest[version](
required_acks=acks,
timeout=timeout,
Expand Down
17 changes: 15 additions & 2 deletions kafka/protocol/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class FetchResponse_v1(Struct):
)


class FetchResponse_v2(Struct):
API_KEY = 1
API_VERSION = 2
SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally


class FetchRequest_v0(Struct):
API_KEY = 1
API_VERSION = 0
Expand All @@ -56,5 +62,12 @@ class FetchRequest_v1(Struct):
SCHEMA = FetchRequest_v0.SCHEMA


FetchRequest = [FetchRequest_v0, FetchRequest_v1]
FetchResponse = [FetchResponse_v0, FetchResponse_v1]
class FetchRequest_v2(Struct):
API_KEY = 1
API_VERSION = 2
RESPONSE_TYPE = FetchResponse_v2
SCHEMA = FetchRequest_v1.SCHEMA


FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2]
FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2]
26 changes: 24 additions & 2 deletions kafka/protocol/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ class ProduceResponse_v1(Struct):
)


class ProduceResponse_v2(Struct):
API_KEY = 0
API_VERSION = 2
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('offset', Int64),
('timestamp', Int64))))),
('thottle_time_ms', Int32)
)


class ProduceRequest_v0(Struct):
API_KEY = 0
API_VERSION = 0
Expand All @@ -52,5 +67,12 @@ class ProduceRequest_v1(Struct):
SCHEMA = ProduceRequest_v0.SCHEMA


ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1]
ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1]
class ProduceRequest_v2(Struct):
API_KEY = 0
API_VERSION = 2
RESPONSE_TYPE = ProduceResponse_v2
SCHEMA = ProduceRequest_v1.SCHEMA


ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2]
ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2]
15 changes: 14 additions & 1 deletion test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@pytest.fixture
def client(mocker):
return mocker.Mock(spec=KafkaClient)
return mocker.Mock(spec=KafkaClient(bootstrap_servers=[]))


@pytest.fixture
Expand Down Expand Up @@ -71,6 +71,19 @@ def test_init_fetches(fetcher, mocker):
assert len(ret) == len(fetch_requests)


@pytest.mark.parametrize(("api_version", "fetch_version"), [
((0, 10), 2),
((0, 9), 1),
((0, 8), 0)
])
def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
fetcher._client.in_flight_request_count.return_value = 0
fetcher.config['api_version'] = api_version
by_node = fetcher._create_fetch_requests()
requests = by_node.values()
assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests])


def test_update_fetch_positions(fetcher, mocker):
mocker.patch.object(fetcher, '_reset_offset')
partition = TopicPartition('foobar', 0)
Expand Down
47 changes: 47 additions & 0 deletions test/test_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# pylint: skip-file
from __future__ import absolute_import

import io

import pytest

from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
from kafka.producer.buffer import MessageSetBuffer
from kafka.producer.sender import Sender
from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.produce import ProduceRequest
from kafka.structs import TopicPartition, OffsetAndMetadata


@pytest.fixture
def client(mocker):
_cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=[]))
_cli.cluster = mocker.Mock(spec=ClusterMetadata())
return _cli


@pytest.fixture
def accumulator():
return RecordAccumulator()


@pytest.fixture
def sender(client, accumulator):
return Sender(client, client.cluster, accumulator)


@pytest.mark.parametrize(("api_version", "produce_version"), [
((0, 10), 2),
((0, 9), 1),
((0, 8), 0)
])
def test_produce_request(sender, mocker, api_version, produce_version):
sender.config['api_version'] = api_version
tp = TopicPartition('foo', 0)
records = MessageSetBuffer(io.BytesIO(), 100000)
batch = RecordBatch(tp, records)
produce_request = sender._produce_request(0, 0, 0, [batch])
assert isinstance(produce_request, ProduceRequest[produce_version])