Skip to content

Commit 90c7294

Browse files
committed
Use version-indexed lists for request/response protocol structs
1 parent 452e7c2 commit 90c7294

20 files changed

+279
-164
lines changed

kafka/client_async.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def _bootstrap(self, hosts):
113113
time.sleep(next_at - now)
114114
self._last_bootstrap = time.time()
115115

116-
metadata_request = MetadataRequest([])
116+
metadata_request = MetadataRequest[0]([])
117117
for host, port, afi in hosts:
118118
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
119119
bootstrap = BrokerConnection(host, port, afi, **self.config)
@@ -299,7 +299,7 @@ def send(self, node_id, request):
299299

300300
# Every request gets a response, except one special case:
301301
expect_response = True
302-
if isinstance(request, ProduceRequest) and request.required_acks == 0:
302+
if isinstance(request, tuple(ProduceRequest)) and request.required_acks == 0:
303303
expect_response = False
304304

305305
return self._conns[node_id].send(request, expect_response=expect_response)
@@ -535,7 +535,7 @@ def _maybe_refresh_metadata(self):
535535
topics = []
536536

537537
if self._can_send_request(node_id):
538-
request = MetadataRequest(topics)
538+
request = MetadataRequest[0](topics)
539539
log.debug("Sending metadata request %s to node %s", request, node_id)
540540
future = self.send(node_id, request)
541541
future.add_callback(self.cluster.update_metadata)
@@ -610,7 +610,7 @@ def connect(node_id):
610610
import socket
611611
from .protocol.admin import ListGroupsRequest
612612
from .protocol.commit import (
613-
OffsetFetchRequest_v0, GroupCoordinatorRequest)
613+
OffsetFetchRequest, GroupCoordinatorRequest)
614614
from .protocol.metadata import MetadataRequest
615615

616616
# Socket errors are logged as exceptions and can alarm users. Mute them
@@ -623,18 +623,18 @@ def filter(self, record):
623623
log_filter = ConnFilter()
624624

625625
test_cases = [
626-
('0.9', ListGroupsRequest()),
627-
('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')),
628-
('0.8.1', OffsetFetchRequest_v0('kafka-python-default-group', [])),
629-
('0.8.0', MetadataRequest([])),
626+
('0.9', ListGroupsRequest[0]()),
627+
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
628+
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
629+
('0.8.0', MetadataRequest[0]([])),
630630
]
631631

632632
logging.getLogger('kafka.conn').addFilter(log_filter)
633633
for version, request in test_cases:
634634
connect(node_id)
635635
f = self.send(node_id, request)
636636
time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
637-
metadata = self.send(node_id, MetadataRequest([]))
637+
metadata = self.send(node_id, MetadataRequest[0]([]))
638638
self.poll(future=f)
639639
self.poll(future=metadata)
640640

kafka/conn.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ def _process_response(self, read_buffer):
321321

322322
# 0.8.2 quirk
323323
if (self.config['api_version'] == (0, 8, 2) and
324-
ifr.response_type is GroupCoordinatorResponse and
324+
ifr.response_type is GroupCoordinatorResponse[0] and
325325
ifr.correlation_id != 0 and
326326
recv_correlation_id == 0):
327327
log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'

kafka/consumer/fetcher.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ def _send_offset_request(self, partition, timestamp):
472472
" wait for metadata refresh", partition)
473473
return Future().failure(Errors.LeaderNotAvailableError(partition))
474474

475-
request = OffsetRequest(
475+
request = OffsetRequest[0](
476476
-1, [(partition.topic, [(partition.partition, timestamp, 1)])]
477477
)
478478
# Client returns a future that only fails on network issues
@@ -552,7 +552,7 @@ def _create_fetch_requests(self):
552552

553553
requests = {}
554554
for node_id, partition_data in six.iteritems(fetchable):
555-
requests[node_id] = FetchRequest(
555+
requests[node_id] = FetchRequest[0](
556556
-1, # replica_id
557557
self.config['fetch_max_wait_ms'],
558558
self.config['fetch_min_bytes'],

kafka/coordinator/base.py

+14-15
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88

99
import kafka.errors as Errors
1010
from kafka.future import Future
11-
from kafka.protocol.commit import (GroupCoordinatorRequest,
12-
OffsetCommitRequest_v2 as OffsetCommitRequest)
11+
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
1312
from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
1413
LeaveGroupRequest, SyncGroupRequest)
1514
from .heartbeat import Heartbeat
@@ -79,8 +78,8 @@ def __init__(self, client, **configs):
7978
self.config[key] = configs[key]
8079

8180
self._client = client
82-
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
83-
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
81+
self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
82+
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
8483
self.group_id = self.config['group_id']
8584
self.coordinator_id = None
8685
self.rejoin_needed = True
@@ -269,7 +268,7 @@ def _send_join_group_request(self):
269268

270269
# send a join group request to the coordinator
271270
log.info("(Re-)joining group %s", self.group_id)
272-
request = JoinGroupRequest(
271+
request = JoinGroupRequest[0](
273272
self.group_id,
274273
self.config['session_timeout_ms'],
275274
self.member_id,
@@ -324,7 +323,7 @@ def _handle_join_group_response(self, future, response):
324323
elif error_type is Errors.UnknownMemberIdError:
325324
# reset the member id and retry immediately
326325
error = error_type(self.member_id)
327-
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
326+
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
328327
log.debug("Attempt to join group %s failed due to unknown member id",
329328
self.group_id)
330329
future.failure(error)
@@ -354,7 +353,7 @@ def _handle_join_group_response(self, future, response):
354353

355354
def _on_join_follower(self):
356355
# send follower's sync group with an empty assignment
357-
request = SyncGroupRequest(
356+
request = SyncGroupRequest[0](
358357
self.group_id,
359358
self.generation,
360359
self.member_id,
@@ -381,7 +380,7 @@ def _on_join_leader(self, response):
381380
except Exception as e:
382381
return Future().failure(e)
383382

384-
request = SyncGroupRequest(
383+
request = SyncGroupRequest[0](
385384
self.group_id,
386385
self.generation,
387386
self.member_id,
@@ -425,7 +424,7 @@ def _handle_sync_group_response(self, future, response):
425424
Errors.IllegalGenerationError):
426425
error = error_type()
427426
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
428-
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
427+
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
429428
future.failure(error)
430429
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
431430
Errors.NotCoordinatorForGroupError):
@@ -450,7 +449,7 @@ def _send_group_coordinator_request(self):
450449

451450
log.debug("Sending group coordinator request for group %s to broker %s",
452451
self.group_id, node_id)
453-
request = GroupCoordinatorRequest(self.group_id)
452+
request = GroupCoordinatorRequest[0](self.group_id)
454453
future = Future()
455454
_f = self._client.send(node_id, request)
456455
_f.add_callback(self._handle_group_coordinator_response, future)
@@ -514,14 +513,14 @@ def close(self):
514513
if not self.coordinator_unknown() and self.generation > 0:
515514
# this is a minimal effort attempt to leave the group. we do not
516515
# attempt any resending if the request fails or times out.
517-
request = LeaveGroupRequest(self.group_id, self.member_id)
516+
request = LeaveGroupRequest[0](self.group_id, self.member_id)
518517
future = self._client.send(self.coordinator_id, request)
519518
future.add_callback(self._handle_leave_group_response)
520519
future.add_errback(log.error, "LeaveGroup request failed: %s")
521520
self._client.poll(future=future)
522521

523-
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
524-
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
522+
self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
523+
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
525524
self.rejoin_needed = True
526525

527526
def _handle_leave_group_response(self, response):
@@ -533,7 +532,7 @@ def _handle_leave_group_response(self, response):
533532

534533
def _send_heartbeat_request(self):
535534
"""Send a heartbeat request"""
536-
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
535+
request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
537536
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
538537
future = Future()
539538
_f = self._client.send(self.coordinator_id, request)
@@ -569,7 +568,7 @@ def _handle_heartbeat_response(self, future, response):
569568
elif error_type is Errors.UnknownMemberIdError:
570569
log.warning("Heartbeat: local member_id was not recognized;"
571570
" this consumer needs to re-join")
572-
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
571+
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
573572
self.rejoin_needed = True
574573
future.failure(error_type)
575574
elif error_type is Errors.GroupAuthorizationFailedError:

kafka/coordinator/consumer.py

+7-9
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
from .protocol import ConsumerProtocol
1515
from .. import errors as Errors
1616
from ..future import Future
17-
from ..protocol.commit import (
18-
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
19-
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
17+
from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest
2018
from ..structs import OffsetAndMetadata, TopicPartition
2119
from ..util import WeakMethod
2220

@@ -430,11 +428,11 @@ def _send_offset_commit_request(self, offsets):
430428
offset_data[tp.topic][tp.partition] = offset
431429

432430
if self.config['api_version'] >= (0, 9):
433-
request = OffsetCommitRequest_v2(
431+
request = OffsetCommitRequest[2](
434432
self.group_id,
435433
self.generation,
436434
self.member_id,
437-
OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME,
435+
OffsetCommitRequest[2].DEFAULT_RETENTION_TIME,
438436
[(
439437
topic, [(
440438
partition,
@@ -444,7 +442,7 @@ def _send_offset_commit_request(self, offsets):
444442
) for topic, partitions in six.iteritems(offset_data)]
445443
)
446444
elif self.config['api_version'] >= (0, 8, 2):
447-
request = OffsetCommitRequest_v1(
445+
request = OffsetCommitRequest[1](
448446
self.group_id, -1, '',
449447
[(
450448
topic, [(
@@ -456,7 +454,7 @@ def _send_offset_commit_request(self, offsets):
456454
) for topic, partitions in six.iteritems(offset_data)]
457455
)
458456
elif self.config['api_version'] >= (0, 8, 1):
459-
request = OffsetCommitRequest_v0(
457+
request = OffsetCommitRequest[0](
460458
self.group_id,
461459
[(
462460
topic, [(
@@ -593,12 +591,12 @@ def _send_offset_fetch_request(self, partitions):
593591
topic_partitions[tp.topic].add(tp.partition)
594592

595593
if self.config['api_version'] >= (0, 8, 2):
596-
request = OffsetFetchRequest_v1(
594+
request = OffsetFetchRequest[1](
597595
self.group_id,
598596
list(topic_partitions.items())
599597
)
600598
else:
601-
request = OffsetFetchRequest_v0(
599+
request = OffsetFetchRequest[0](
602600
self.group_id,
603601
list(topic_partitions.items())
604602
)

kafka/producer/sender.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
from ..version import __version__
1313
from ..protocol.produce import ProduceRequest
1414

15-
16-
1715
log = logging.getLogger(__name__)
1816

1917

@@ -258,7 +256,7 @@ def _produce_request(self, node_id, acks, timeout, batches):
258256
buf = batch.records.buffer()
259257
produce_records_by_partition[topic][partition] = buf
260258

261-
return ProduceRequest(
259+
return ProduceRequest[0](
262260
required_acks=acks,
263261
timeout=timeout,
264262
topics=[(topic, list(partition_info.items()))

kafka/protocol/admin.py

+18-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
from .types import Array, Bytes, Int16, Schema, String
33

44

5-
class ListGroupsResponse(Struct):
5+
class ListGroupsResponse_v0(Struct):
6+
API_KEY = 16
7+
API_VERSION = 0
68
SCHEMA = Schema(
79
('error_code', Int16),
810
('groups', Array(
@@ -11,14 +13,20 @@ class ListGroupsResponse(Struct):
1113
)
1214

1315

14-
class ListGroupsRequest(Struct):
16+
class ListGroupsRequest_v0(Struct):
1517
API_KEY = 16
1618
API_VERSION = 0
17-
RESPONSE_TYPE = ListGroupsResponse
19+
RESPONSE_TYPE = ListGroupsResponse_v0
1820
SCHEMA = Schema()
1921

2022

21-
class DescribeGroupsResponse(Struct):
23+
ListGroupsRequest = [ListGroupsRequest_v0]
24+
ListGroupsResponse = [ListGroupsResponse_v0]
25+
26+
27+
class DescribeGroupsResponse_v0(Struct):
28+
API_KEY = 15
29+
API_VERSION = 0
2230
SCHEMA = Schema(
2331
('groups', Array(
2432
('error_code', Int16),
@@ -35,10 +43,14 @@ class DescribeGroupsResponse(Struct):
3543
)
3644

3745

38-
class DescribeGroupsRequest(Struct):
46+
class DescribeGroupsRequest_v0(Struct):
3947
API_KEY = 15
4048
API_VERSION = 0
41-
RESPONSE_TYPE = DescribeGroupsResponse
49+
RESPONSE_TYPE = DescribeGroupsResponse_v0
4250
SCHEMA = Schema(
4351
('groups', Array(String('utf-8')))
4452
)
53+
54+
55+
DescribeGroupsRequest = [DescribeGroupsRequest_v0]
56+
DescribeGroupsResponse = [DescribeGroupsResponse_v0]

0 commit comments

Comments
 (0)