Skip to content

Commit 96530f6

Browse files
committed
Use Fetch/Produce API v2 for brokers >= 0.10 (uses message format v1) (dpkp#694)
1 parent 7941a2a commit 96530f6

File tree

8 files changed

+127
-10
lines changed

8 files changed

+127
-10
lines changed

kafka/consumer/fetcher.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,12 @@ def _create_fetch_requests(self):
581581
log.debug("Adding fetch request for partition %s at offset %d",
582582
partition, position)
583583

584-
version = 1 if self.config['api_version'] >= (0, 9) else 0
584+
if self.config['api_version'] >= (0, 10):
585+
version = 2
586+
elif self.config['api_version'] == (0, 9):
587+
version = 1
588+
else:
589+
version = 0
585590
requests = {}
586591
for node_id, partition_data in six.iteritems(fetchable):
587592
requests[node_id] = FetchRequest[version](

kafka/errors.py

+6
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,12 @@ class ClusterAuthorizationFailedError(BrokerResponseError):
310310
' use an inter-broker or administrative API.')
311311

312312

313+
class InvalidTimestampError(BrokerResponseError):
314+
errno = 32
315+
message = 'INVALID_TIMESTAMP'
316+
description = ('The timestamp of the message is out of acceptable range.')
317+
318+
313319
class KafkaUnavailableError(KafkaError):
314320
pass
315321

kafka/producer/kafka.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,8 @@ def __init__(self, **configs):
283283
if self.config['compression_type'] == 'lz4':
284284
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
285285

286-
self._accumulator = RecordAccumulator(**self.config)
286+
message_version = 1 if self.config['api_version'] >= (0, 10) else 0
287+
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
287288
self._metadata = client.cluster
288289
self._sender = Sender(client, self._metadata, self._accumulator,
289290
**self.config)

kafka/producer/sender.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,16 @@ def _handle_produce_response(self, batches, response):
174174
for batch in batches])
175175

176176
for topic, partitions in response.topics:
177-
for partition, error_code, offset in partitions:
177+
for partition_info in partitions:
178+
if response.API_VERSION < 2:
179+
partition, error_code, offset = partition_info
180+
ts = None
181+
else:
182+
partition, error_code, offset, ts = partition_info
178183
tp = TopicPartition(topic, partition)
179184
error = Errors.for_code(error_code)
180185
batch = batches_by_partition[tp]
181-
self._complete_batch(batch, error, offset)
186+
self._complete_batch(batch, error, offset, ts)
182187

183188
else:
184189
# this is the acks = 0 case, just complete all requests
@@ -258,7 +263,12 @@ def _produce_request(self, node_id, acks, timeout, batches):
258263
buf = batch.records.buffer()
259264
produce_records_by_partition[topic][partition] = buf
260265

261-
version = 1 if self.config['api_version'] >= (0, 9) else 0
266+
if self.config['api_version'] >= (0, 10):
267+
version = 2
268+
elif self.config['api_version'] == (0, 9):
269+
version = 1
270+
else:
271+
version = 0
262272
return ProduceRequest[version](
263273
required_acks=acks,
264274
timeout=timeout,

kafka/protocol/fetch.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ class FetchResponse_v1(Struct):
3232
)
3333

3434

35+
class FetchResponse_v2(Struct):
36+
API_KEY = 1
37+
API_VERSION = 2
38+
SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally
39+
40+
3541
class FetchRequest_v0(Struct):
3642
API_KEY = 1
3743
API_VERSION = 0
@@ -56,5 +62,12 @@ class FetchRequest_v1(Struct):
5662
SCHEMA = FetchRequest_v0.SCHEMA
5763

5864

59-
FetchRequest = [FetchRequest_v0, FetchRequest_v1]
60-
FetchResponse = [FetchResponse_v0, FetchResponse_v1]
65+
class FetchRequest_v2(Struct):
66+
API_KEY = 1
67+
API_VERSION = 2
68+
RESPONSE_TYPE = FetchResponse_v2
69+
SCHEMA = FetchRequest_v1.SCHEMA
70+
71+
72+
FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2]
73+
FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2]

kafka/protocol/produce.py

+24-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,21 @@ class ProduceResponse_v1(Struct):
3030
)
3131

3232

33+
class ProduceResponse_v2(Struct):
34+
API_KEY = 0
35+
API_VERSION = 2
36+
SCHEMA = Schema(
37+
('topics', Array(
38+
('topic', String('utf-8')),
39+
('partitions', Array(
40+
('partition', Int32),
41+
('error_code', Int16),
42+
('offset', Int64),
43+
('timestamp', Int64))))),
44+
('thottle_time_ms', Int32)
45+
)
46+
47+
3348
class ProduceRequest_v0(Struct):
3449
API_KEY = 0
3550
API_VERSION = 0
@@ -52,5 +67,12 @@ class ProduceRequest_v1(Struct):
5267
SCHEMA = ProduceRequest_v0.SCHEMA
5368

5469

55-
ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1]
56-
ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1]
70+
class ProduceRequest_v2(Struct):
71+
API_KEY = 0
72+
API_VERSION = 2
73+
RESPONSE_TYPE = ProduceResponse_v2
74+
SCHEMA = ProduceRequest_v1.SCHEMA
75+
76+
77+
ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2]
78+
ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2]

test/test_fetcher.py

+14-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
@pytest.fixture
1717
def client(mocker):
18-
return mocker.Mock(spec=KafkaClient)
18+
return mocker.Mock(spec=KafkaClient(bootstrap_servers=[]))
1919

2020

2121
@pytest.fixture
@@ -71,6 +71,19 @@ def test_init_fetches(fetcher, mocker):
7171
assert len(ret) == len(fetch_requests)
7272

7373

74+
@pytest.mark.parametrize(("api_version", "fetch_version"), [
75+
((0, 10), 2),
76+
((0, 9), 1),
77+
((0, 8), 0)
78+
])
79+
def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
80+
fetcher._client.in_flight_request_count.return_value = 0
81+
fetcher.config['api_version'] = api_version
82+
by_node = fetcher._create_fetch_requests()
83+
requests = by_node.values()
84+
assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests])
85+
86+
7487
def test_update_fetch_positions(fetcher, mocker):
7588
mocker.patch.object(fetcher, '_reset_offset')
7689
partition = TopicPartition('foobar', 0)

test/test_sender.py

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# pylint: skip-file
2+
from __future__ import absolute_import
3+
4+
import io
5+
6+
import pytest
7+
8+
from kafka.client_async import KafkaClient
9+
from kafka.cluster import ClusterMetadata
10+
from kafka.producer.buffer import MessageSetBuffer
11+
from kafka.producer.sender import Sender
12+
from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
13+
import kafka.errors as Errors
14+
from kafka.future import Future
15+
from kafka.protocol.produce import ProduceRequest
16+
from kafka.structs import TopicPartition, OffsetAndMetadata
17+
18+
19+
@pytest.fixture
20+
def client(mocker):
21+
_cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=[]))
22+
_cli.cluster = mocker.Mock(spec=ClusterMetadata())
23+
return _cli
24+
25+
26+
@pytest.fixture
27+
def accumulator():
28+
return RecordAccumulator()
29+
30+
31+
@pytest.fixture
32+
def sender(client, accumulator):
33+
return Sender(client, client.cluster, accumulator)
34+
35+
36+
@pytest.mark.parametrize(("api_version", "produce_version"), [
37+
((0, 10), 2),
38+
((0, 9), 1),
39+
((0, 8), 0)
40+
])
41+
def test_produce_request(sender, mocker, api_version, produce_version):
42+
sender.config['api_version'] = api_version
43+
tp = TopicPartition('foo', 0)
44+
records = MessageSetBuffer(io.BytesIO(), 100000)
45+
batch = RecordBatch(tp, records)
46+
produce_request = sender._produce_request(0, 0, 0, [batch])
47+
assert isinstance(produce_request, ProduceRequest[produce_version])

0 commit comments

Comments
 (0)