Skip to content

Commit d401bde

Browse files
committed
Merge pull request dpkp#678 from dpkp/check_version_0_10
Add protocol support for ApiVersionRequest
2 parents 874f487 + a3b7dca commit d401bde

File tree

4 files changed

+26
-3
lines changed

4 files changed

+26
-3
lines changed

kafka/conn.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ def check_version(self, timeout=2, strict=False):
520520
# vanilla MetadataRequest. If the server did not recognize the first
521521
# request, both will be failed with a ConnectionError that wraps
522522
# socket.error (32, 54, or 104)
523-
from .protocol.admin import ListGroupsRequest
523+
from .protocol.admin import ApiVersionRequest, ListGroupsRequest
524524
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
525525
from .protocol.metadata import MetadataRequest
526526

@@ -536,6 +536,7 @@ def filter(self, record):
536536
log.addFilter(log_filter)
537537

538538
test_cases = [
539+
('0.10', ApiVersionRequest[0]()),
539540
('0.9', ListGroupsRequest[0]()),
540541
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
541542
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),

kafka/consumer/group.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def __init__(self, *topics, **configs):
225225
# Check Broker Version if not set explicitly
226226
if self.config['api_version'] == 'auto':
227227
self.config['api_version'] = self._client.check_version()
228-
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0'), 'Unrecognized api version'
228+
assert self.config['api_version'] in ('0.10', '0.9', '0.8.2', '0.8.1', '0.8.0'), 'Unrecognized api version'
229229

230230
# Convert api_version config to tuple for easy comparisons
231231
self.config['api_version'] = tuple(

kafka/producer/kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ def __init__(self, **configs):
268268
# Check Broker Version if not set explicitly
269269
if self.config['api_version'] == 'auto':
270270
self.config['api_version'] = client.check_version()
271-
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0')
271+
assert self.config['api_version'] in ('0.10', '0.9', '0.8.2', '0.8.1', '0.8.0')
272272

273273
# Convert api_version config to tuple for easy comparisons
274274
self.config['api_version'] = tuple(

kafka/protocol/admin.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,28 @@
22
from .types import Array, Bytes, Int16, Schema, String
33

44

5+
class ApiVersionResponse_v0(Struct):
6+
API_KEY = 18
7+
API_VERSION = 0
8+
SCHEMA = Schema(
9+
('error_code', Int16),
10+
('api_versions', Array(
11+
('api_key', Int16),
12+
('min_version', Int16),
13+
('max_version', Int16))))
14+
15+
16+
class ApiVersionRequest_v0(Struct):
17+
API_KEY = 18
18+
API_VERSION = 0
19+
RESPONSE_TYPE = ApiVersionResponse_v0
20+
SCHEMA = Schema()
21+
22+
23+
ApiVersionRequest = [ApiVersionRequest_v0]
24+
ApiVersionResponse = [ApiVersionResponse_v0]
25+
26+
527
class ListGroupsResponse_v0(Struct):
628
API_KEY = 16
729
API_VERSION = 0

0 commit comments

Comments
 (0)