Skip to content

Commit 65ba882

Browse files
authored
Derive all api classes from Request / Response base classes (dpkp#1030)
1 parent a00f9ea commit 65ba882

13 files changed

+146
-88
lines changed

kafka/client.py

+2-6
Original file line numberDiff line numberDiff line change
@@ -257,18 +257,14 @@ def failed_payloads(payloads):
257257
continue
258258

259259
request = encoder_fn(payloads=broker_payloads)
260-
# decoder_fn=None signal that the server is expected to not
261-
# send a response. This probably only applies to
262-
# ProduceRequest w/ acks = 0
263-
expect_response = (decoder_fn is not None)
264-
future = conn.send(request, expect_response=expect_response)
260+
future = conn.send(request)
265261

266262
if future.failed():
267263
refresh_metadata = True
268264
failed_payloads(broker_payloads)
269265
continue
270266

271-
if not expect_response:
267+
if not request.expect_response():
272268
for payload in broker_payloads:
273269
topic_partition = (str(payload.topic), payload.partition)
274270
responses[topic_partition] = None

kafka/client_async.py

+1-6
Original file line numberDiff line numberDiff line change
@@ -464,12 +464,7 @@ def send(self, node_id, request):
464464
if not self._maybe_connect(node_id):
465465
return Future().failure(Errors.NodeNotReadyError(node_id))
466466

467-
# Every request gets a response, except one special case:
468-
expect_response = True
469-
if isinstance(request, tuple(ProduceRequest)) and request.required_acks == 0:
470-
expect_response = False
471-
472-
return self._conns[node_id].send(request, expect_response=expect_response)
467+
return self._conns[node_id].send(request)
473468

474469
def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
475470
"""Try to read and write to sockets.

kafka/conn.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ def close(self, error=None):
525525
ifr.future.failure(error)
526526
self.config['state_change_callback'](self)
527527

528-
def send(self, request, expect_response=True):
528+
def send(self, request):
529529
"""send request, return Future()
530530
531531
Can block on network if request is larger than send_buffer_bytes
@@ -537,9 +537,9 @@ def send(self, request, expect_response=True):
537537
return future.failure(Errors.ConnectionError(str(self)))
538538
elif not self.can_send_more():
539539
return future.failure(Errors.TooManyInFlightRequests(str(self)))
540-
return self._send(request, expect_response=expect_response)
540+
return self._send(request)
541541

542-
def _send(self, request, expect_response=True):
542+
def _send(self, request):
543543
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
544544
future = Future()
545545
correlation_id = self._next_correlation_id()
@@ -569,7 +569,7 @@ def _send(self, request, expect_response=True):
569569
return future.failure(error)
570570
log.debug('%s Request %d: %s', self, correlation_id, request)
571571

572-
if expect_response:
572+
if request.expect_response():
573573
ifr = InFlightRequest(request=request,
574574
correlation_id=correlation_id,
575575
response_type=request.RESPONSE_TYPE,

kafka/protocol/admin.py

+15-15
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from __future__ import absolute_import
22

3-
from .struct import Struct
3+
from .api import Request, Response
44
from .types import Array, Boolean, Bytes, Int16, Int32, Schema, String
55

66

7-
class ApiVersionResponse_v0(Struct):
7+
class ApiVersionResponse_v0(Response):
88
API_KEY = 18
99
API_VERSION = 0
1010
SCHEMA = Schema(
@@ -16,7 +16,7 @@ class ApiVersionResponse_v0(Struct):
1616
)
1717

1818

19-
class ApiVersionRequest_v0(Struct):
19+
class ApiVersionRequest_v0(Request):
2020
API_KEY = 18
2121
API_VERSION = 0
2222
RESPONSE_TYPE = ApiVersionResponse_v0
@@ -27,7 +27,7 @@ class ApiVersionRequest_v0(Struct):
2727
ApiVersionResponse = [ApiVersionResponse_v0]
2828

2929

30-
class CreateTopicsResponse_v0(Struct):
30+
class CreateTopicsResponse_v0(Response):
3131
API_KEY = 19
3232
API_VERSION = 0
3333
SCHEMA = Schema(
@@ -37,7 +37,7 @@ class CreateTopicsResponse_v0(Struct):
3737
)
3838

3939

40-
class CreateTopicsResponse_v1(Struct):
40+
class CreateTopicsResponse_v1(Response):
4141
API_KEY = 19
4242
API_VERSION = 1
4343
SCHEMA = Schema(
@@ -48,7 +48,7 @@ class CreateTopicsResponse_v1(Struct):
4848
)
4949

5050

51-
class CreateTopicsRequest_v0(Struct):
51+
class CreateTopicsRequest_v0(Request):
5252
API_KEY = 19
5353
API_VERSION = 0
5454
RESPONSE_TYPE = CreateTopicsResponse_v0
@@ -67,7 +67,7 @@ class CreateTopicsRequest_v0(Struct):
6767
)
6868

6969

70-
class CreateTopicsRequest_v1(Struct):
70+
class CreateTopicsRequest_v1(Request):
7171
API_KEY = 19
7272
API_VERSION = 1
7373
RESPONSE_TYPE = CreateTopicsResponse_v1
@@ -91,7 +91,7 @@ class CreateTopicsRequest_v1(Struct):
9191
CreateTopicsResponse = [CreateTopicsResponse_v0, CreateTopicsRequest_v1]
9292

9393

94-
class DeleteTopicsResponse_v0(Struct):
94+
class DeleteTopicsResponse_v0(Response):
9595
API_KEY = 20
9696
API_VERSION = 0
9797
SCHEMA = Schema(
@@ -101,7 +101,7 @@ class DeleteTopicsResponse_v0(Struct):
101101
)
102102

103103

104-
class DeleteTopicsRequest_v0(Struct):
104+
class DeleteTopicsRequest_v0(Request):
105105
API_KEY = 20
106106
API_VERSION = 0
107107
RESPONSE_TYPE = DeleteTopicsResponse_v0
@@ -115,7 +115,7 @@ class DeleteTopicsRequest_v0(Struct):
115115
DeleteTopicsResponse = [DeleteTopicsResponse_v0]
116116

117117

118-
class ListGroupsResponse_v0(Struct):
118+
class ListGroupsResponse_v0(Response):
119119
API_KEY = 16
120120
API_VERSION = 0
121121
SCHEMA = Schema(
@@ -126,7 +126,7 @@ class ListGroupsResponse_v0(Struct):
126126
)
127127

128128

129-
class ListGroupsRequest_v0(Struct):
129+
class ListGroupsRequest_v0(Request):
130130
API_KEY = 16
131131
API_VERSION = 0
132132
RESPONSE_TYPE = ListGroupsResponse_v0
@@ -137,7 +137,7 @@ class ListGroupsRequest_v0(Struct):
137137
ListGroupsResponse = [ListGroupsResponse_v0]
138138

139139

140-
class DescribeGroupsResponse_v0(Struct):
140+
class DescribeGroupsResponse_v0(Response):
141141
API_KEY = 15
142142
API_VERSION = 0
143143
SCHEMA = Schema(
@@ -156,7 +156,7 @@ class DescribeGroupsResponse_v0(Struct):
156156
)
157157

158158

159-
class DescribeGroupsRequest_v0(Struct):
159+
class DescribeGroupsRequest_v0(Request):
160160
API_KEY = 15
161161
API_VERSION = 0
162162
RESPONSE_TYPE = DescribeGroupsResponse_v0
@@ -169,7 +169,7 @@ class DescribeGroupsRequest_v0(Struct):
169169
DescribeGroupsResponse = [DescribeGroupsResponse_v0]
170170

171171

172-
class SaslHandShakeResponse_v0(Struct):
172+
class SaslHandShakeResponse_v0(Response):
173173
API_KEY = 17
174174
API_VERSION = 0
175175
SCHEMA = Schema(
@@ -178,7 +178,7 @@ class SaslHandShakeResponse_v0(Struct):
178178
)
179179

180180

181-
class SaslHandShakeRequest_v0(Struct):
181+
class SaslHandShakeRequest_v0(Request):
182182
API_KEY = 17
183183
API_VERSION = 0
184184
RESPONSE_TYPE = SaslHandShakeResponse_v0

kafka/protocol/api.py

+49
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import absolute_import
22

3+
import abc
4+
35
from .struct import Struct
46
from .types import Int16, Int32, String, Schema
57

@@ -16,3 +18,50 @@ def __init__(self, request, correlation_id=0, client_id='kafka-python'):
1618
super(RequestHeader, self).__init__(
1719
request.API_KEY, request.API_VERSION, correlation_id, client_id
1820
)
21+
22+
23+
class Request(Struct):
24+
__metaclass__ = abc.ABCMeta
25+
26+
@abc.abstractproperty
27+
def API_KEY(self):
28+
"""Integer identifier for api request"""
29+
pass
30+
31+
@abc.abstractproperty
32+
def API_VERSION(self):
33+
"""Integer of api request version"""
34+
pass
35+
36+
@abc.abstractproperty
37+
def SCHEMA(self):
38+
"""An instance of Schema() representing the request structure"""
39+
pass
40+
41+
@abc.abstractproperty
42+
def RESPONSE_TYPE(self):
43+
"""The Response class associated with the api request"""
44+
pass
45+
46+
def expect_response(self):
47+
"""Override this method if an api request does not always generate a response"""
48+
return True
49+
50+
51+
class Response(Struct):
52+
__metaclass__ = abc.ABCMeta
53+
54+
@abc.abstractproperty
55+
def API_KEY(self):
56+
"""Integer identifier for api request/response"""
57+
pass
58+
59+
@abc.abstractproperty
60+
def API_VERSION(self):
61+
"""Integer of api request/response version"""
62+
pass
63+
64+
@abc.abstractproperty
65+
def SCHEMA(self):
66+
"""An instance of Schema() representing the response structure"""
67+
pass

kafka/protocol/commit.py

+15-15
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from __future__ import absolute_import
22

3-
from .struct import Struct
3+
from .api import Request, Response
44
from .types import Array, Int16, Int32, Int64, Schema, String
55

66

7-
class OffsetCommitResponse_v0(Struct):
7+
class OffsetCommitResponse_v0(Response):
88
API_KEY = 8
99
API_VERSION = 0
1010
SCHEMA = Schema(
@@ -16,19 +16,19 @@ class OffsetCommitResponse_v0(Struct):
1616
)
1717

1818

19-
class OffsetCommitResponse_v1(Struct):
19+
class OffsetCommitResponse_v1(Response):
2020
API_KEY = 8
2121
API_VERSION = 1
2222
SCHEMA = OffsetCommitResponse_v0.SCHEMA
2323

2424

25-
class OffsetCommitResponse_v2(Struct):
25+
class OffsetCommitResponse_v2(Response):
2626
API_KEY = 8
2727
API_VERSION = 2
2828
SCHEMA = OffsetCommitResponse_v1.SCHEMA
2929

3030

31-
class OffsetCommitRequest_v0(Struct):
31+
class OffsetCommitRequest_v0(Request):
3232
API_KEY = 8
3333
API_VERSION = 0 # Zookeeper-backed storage
3434
RESPONSE_TYPE = OffsetCommitResponse_v0
@@ -43,7 +43,7 @@ class OffsetCommitRequest_v0(Struct):
4343
)
4444

4545

46-
class OffsetCommitRequest_v1(Struct):
46+
class OffsetCommitRequest_v1(Request):
4747
API_KEY = 8
4848
API_VERSION = 1 # Kafka-backed storage
4949
RESPONSE_TYPE = OffsetCommitResponse_v1
@@ -61,7 +61,7 @@ class OffsetCommitRequest_v1(Struct):
6161
)
6262

6363

64-
class OffsetCommitRequest_v2(Struct):
64+
class OffsetCommitRequest_v2(Request):
6565
API_KEY = 8
6666
API_VERSION = 2 # added retention_time, dropped timestamp
6767
RESPONSE_TYPE = OffsetCommitResponse_v2
@@ -87,7 +87,7 @@ class OffsetCommitRequest_v2(Struct):
8787
OffsetCommitResponse_v2]
8888

8989

90-
class OffsetFetchResponse_v0(Struct):
90+
class OffsetFetchResponse_v0(Response):
9191
API_KEY = 9
9292
API_VERSION = 0
9393
SCHEMA = Schema(
@@ -101,13 +101,13 @@ class OffsetFetchResponse_v0(Struct):
101101
)
102102

103103

104-
class OffsetFetchResponse_v1(Struct):
104+
class OffsetFetchResponse_v1(Response):
105105
API_KEY = 9
106106
API_VERSION = 1
107107
SCHEMA = OffsetFetchResponse_v0.SCHEMA
108108

109109

110-
class OffsetFetchResponse_v2(Struct):
110+
class OffsetFetchResponse_v2(Response):
111111
# Added in KIP-88
112112
API_KEY = 9
113113
API_VERSION = 2
@@ -123,7 +123,7 @@ class OffsetFetchResponse_v2(Struct):
123123
)
124124

125125

126-
class OffsetFetchRequest_v0(Struct):
126+
class OffsetFetchRequest_v0(Request):
127127
API_KEY = 9
128128
API_VERSION = 0 # zookeeper-backed storage
129129
RESPONSE_TYPE = OffsetFetchResponse_v0
@@ -135,14 +135,14 @@ class OffsetFetchRequest_v0(Struct):
135135
)
136136

137137

138-
class OffsetFetchRequest_v1(Struct):
138+
class OffsetFetchRequest_v1(Request):
139139
API_KEY = 9
140140
API_VERSION = 1 # kafka-backed storage
141141
RESPONSE_TYPE = OffsetFetchResponse_v1
142142
SCHEMA = OffsetFetchRequest_v0.SCHEMA
143143

144144

145-
class OffsetFetchRequest_v2(Struct):
145+
class OffsetFetchRequest_v2(Request):
146146
# KIP-88: Allows passing null topics to return offsets for all partitions
147147
# that the consumer group has a stored offset for, even if no consumer in
148148
# the group is currently consuming that partition.
@@ -158,7 +158,7 @@ class OffsetFetchRequest_v2(Struct):
158158
OffsetFetchResponse_v2]
159159

160160

161-
class GroupCoordinatorResponse_v0(Struct):
161+
class GroupCoordinatorResponse_v0(Response):
162162
API_KEY = 10
163163
API_VERSION = 0
164164
SCHEMA = Schema(
@@ -169,7 +169,7 @@ class GroupCoordinatorResponse_v0(Struct):
169169
)
170170

171171

172-
class GroupCoordinatorRequest_v0(Struct):
172+
class GroupCoordinatorRequest_v0(Request):
173173
API_KEY = 10
174174
API_VERSION = 0
175175
RESPONSE_TYPE = GroupCoordinatorResponse_v0

0 commit comments

Comments
 (0)