Skip to content

Commit adc6a93

Browse files
committed
Merge pull request dpkp#628 from dpkp/kafka-2136
KAFKA-2136: support Fetch and Produce v1 (throttle_time_ms)
2 parents 734cb28 + 3d16f2f commit adc6a93

File tree

5 files changed

+57
-14
lines changed

5 files changed

+57
-14
lines changed

kafka/consumer/fetcher.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class Fetcher(six.Iterator):
3737
'max_partition_fetch_bytes': 1048576,
3838
'check_crcs': True,
3939
'iterator_refetch_records': 1, # undocumented -- interface may change
40+
'api_version': (0, 8, 0),
4041
}
4142

4243
def __init__(self, client, subscriptions, **configs):
@@ -531,7 +532,7 @@ def _create_fetch_requests(self):
531532
FetchRequests skipped if no leader, or node has requests in flight
532533
533534
Returns:
534-
dict: {node_id: FetchRequest, ...}
535+
dict: {node_id: FetchRequest, ...} (version depends on api_version)
535536
"""
536537
# create the fetch info as a dict of lists of partition info tuples
537538
# which can be passed to FetchRequest() via .items()
@@ -564,9 +565,10 @@ def _create_fetch_requests(self):
564565
log.debug("Adding fetch request for partition %s at offset %d",
565566
partition, position)
566567

568+
version = 1 if self.config['api_version'] >= (0, 9) else 0
567569
requests = {}
568570
for node_id, partition_data in six.iteritems(fetchable):
569-
requests[node_id] = FetchRequest[0](
571+
requests[node_id] = FetchRequest[version](
570572
-1, # replica_id
571573
self.config['fetch_max_wait_ms'],
572574
self.config['fetch_min_bytes'],

kafka/producer/sender.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class Sender(threading.Thread):
2727
'retries': 0,
2828
'request_timeout_ms': 30000,
2929
'client_id': 'kafka-python-' + __version__,
30+
'api_version': (0, 8, 0),
3031
}
3132

3233
def __init__(self, client, metadata, accumulator, **configs):
@@ -232,7 +233,7 @@ def _create_produce_requests(self, collated):
232233
collated: {node_id: [RecordBatch]}
233234
234235
Returns:
235-
dict: {node_id: ProduceRequest}
236+
dict: {node_id: ProduceRequest} (version depends on api_version)
236237
"""
237238
requests = {}
238239
for node_id, batches in six.iteritems(collated):
@@ -245,7 +246,7 @@ def _produce_request(self, node_id, acks, timeout, batches):
245246
"""Create a produce request from the given record batches.
246247
247248
Returns:
248-
ProduceRequest
249+
ProduceRequest (version depends on api_version)
249250
"""
250251
produce_records_by_partition = collections.defaultdict(dict)
251252
for batch in batches:
@@ -256,7 +257,8 @@ def _produce_request(self, node_id, acks, timeout, batches):
256257
buf = batch.records.buffer()
257258
produce_records_by_partition[topic][partition] = buf
258259

259-
return ProduceRequest[0](
260+
version = 1 if self.config['api_version'] >= (0, 9) else 0
261+
return ProduceRequest[version](
260262
required_acks=acks,
261263
timeout=timeout,
262264
topics=[(topic, list(partition_info.items()))

kafka/protocol/fetch.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,21 @@ class FetchResponse_v0(Struct):
1717
)
1818

1919

20+
class FetchResponse_v1(Struct):
21+
API_KEY = 1
22+
API_VERSION = 1
23+
SCHEMA = Schema(
24+
('throttle_time_ms', Int32),
25+
('topics', Array(
26+
('topics', String('utf-8')),
27+
('partitions', Array(
28+
('partition', Int32),
29+
('error_code', Int16),
30+
('highwater_offset', Int64),
31+
('message_set', MessageSet)))))
32+
)
33+
34+
2035
class FetchRequest_v0(Struct):
2136
API_KEY = 1
2237
API_VERSION = 0
@@ -34,5 +49,12 @@ class FetchRequest_v0(Struct):
3449
)
3550

3651

37-
FetchRequest = [FetchRequest_v0]
38-
FetchResponse = [FetchResponse_v0]
52+
class FetchRequest_v1(Struct):
53+
API_KEY = 1
54+
API_VERSION = 1
55+
RESPONSE_TYPE = FetchResponse_v1
56+
SCHEMA = FetchRequest_v0.SCHEMA
57+
58+
59+
FetchRequest = [FetchRequest_v0, FetchRequest_v1]
60+
FetchResponse = [FetchResponse_v0, FetchResponse_v1]

kafka/protocol/legacy.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -336,11 +336,7 @@ def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
336336
payloads: list of OffsetFetchRequestPayload
337337
from_kafka: bool, default False, set True for Kafka-committed offsets
338338
"""
339-
if from_kafka:
340-
version = 1
341-
else:
342-
version = 0
343-
339+
version = 1 if from_kafka else 0
344340
return kafka.protocol.commit.OffsetFetchRequest[version](
345341
consumer_group=group,
346342
topics=[(

kafka/protocol/produce.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,20 @@ class ProduceResponse_v0(Struct):
1616
)
1717

1818

19+
class ProduceResponse_v1(Struct):
20+
API_KEY = 0
21+
API_VERSION = 1
22+
SCHEMA = Schema(
23+
('topics', Array(
24+
('topic', String('utf-8')),
25+
('partitions', Array(
26+
('partition', Int32),
27+
('error_code', Int16),
28+
('offset', Int64))))),
29+
('throttle_time_ms', Int32)
30+
)
31+
32+
1933
class ProduceRequest_v0(Struct):
2034
API_KEY = 0
2135
API_VERSION = 0
@@ -31,5 +45,12 @@ class ProduceRequest_v0(Struct):
3145
)
3246

3347

34-
ProduceRequest = [ProduceRequest_v0]
35-
ProduceResponse = [ProduceResponse_v0]
48+
class ProduceRequest_v1(Struct):
49+
API_KEY = 0
50+
API_VERSION = 1
51+
RESPONSE_TYPE = ProduceResponse_v1
52+
SCHEMA = ProduceRequest_v0.SCHEMA
53+
54+
55+
ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1]
56+
ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1]

0 commit comments

Comments
 (0)