|
1 |
| -import struct |
| 1 | +from .struct import Struct |
| 2 | +from .types import Int16, Int32, String, Schema |
2 | 3 |
|
3 |
| -from .types import ( |
4 |
| - Int8, Int16, Int32, Int64, Bytes, String, Array |
5 |
| -) |
6 |
| -from ..util import crc32 |
7 | 4 |
|
| 5 | +class RequestHeader(Struct): |
| 6 | + SCHEMA = Schema( |
| 7 | + ('api_key', Int16), |
| 8 | + ('api_version', Int16), |
| 9 | + ('correlation_id', Int32), |
| 10 | + ('client_id', String('utf-8')) |
| 11 | + ) |
8 | 12 |
|
9 |
| -class Message(object): |
10 |
| - MAGIC_BYTE = 0 |
11 |
| - __slots__ = ('magic', 'attributes', 'key', 'value') |
12 |
| - |
13 |
| - def __init__(self, value, key=None, magic=0, attributes=0): |
14 |
| - self.magic = magic |
15 |
| - self.attributes = attributes |
16 |
| - self.key = key |
17 |
| - self.value = value |
18 |
| - |
19 |
| - def encode(self): |
20 |
| - message = ( |
21 |
| - Int8.encode(self.magic) + |
22 |
| - Int8.encode(self.attributes) + |
23 |
| - Bytes.encode(self.key) + |
24 |
| - Bytes.encode(self.value) |
| 13 | + def __init__(self, request, correlation_id=0, client_id='kafka-python'): |
| 14 | + super(RequestHeader, self).__init__( |
| 15 | + request.API_KEY, request.API_VERSION, correlation_id, client_id |
25 | 16 | )
|
26 |
| - return ( |
27 |
| - struct.pack('>I', crc32(message)) + |
28 |
| - message |
29 |
| - ) |
30 |
| - |
31 |
| - |
32 |
| -class MessageSet(object): |
33 |
| - |
34 |
| - @staticmethod |
35 |
| - def _encode_one(message): |
36 |
| - encoded = message.encode() |
37 |
| - return (Int64.encode(0) + Int32.encode(len(encoded)) + encoded) |
38 |
| - |
39 |
| - @staticmethod |
40 |
| - def encode(messages): |
41 |
| - return b''.join(map(MessageSet._encode_one, messages)) |
42 |
| - |
43 |
| - |
44 |
| -class AbstractRequestResponse(object): |
45 |
| - @classmethod |
46 |
| - def encode(cls, message): |
47 |
| - return Int32.encode(len(message)) + message |
48 |
| - |
49 |
| - |
50 |
| -class AbstractRequest(AbstractRequestResponse): |
51 |
| - @classmethod |
52 |
| - def encode(cls, request, correlation_id=0, client_id='kafka-python'): |
53 |
| - request = (Int16.encode(cls.API_KEY) + |
54 |
| - Int16.encode(cls.API_VERSION) + |
55 |
| - Int32.encode(correlation_id) + |
56 |
| - String.encode(client_id) + |
57 |
| - request) |
58 |
| - return super(AbstractRequest, cls).encode(request) |
59 |
| - |
60 |
| - |
61 |
| -class FetchRequest(AbstractRequest): |
62 |
| - API_KEY = 1 |
63 |
| - API_VERSION = 0 |
64 |
| - __slots__ = ('replica_id', 'max_wait_time', 'min_bytes', 'topic_partition_offsets') |
65 |
| - |
66 |
| - def __init__(self, topic_partition_offsets, |
67 |
| - max_wait_time=-1, min_bytes=0, replica_id=-1): |
68 |
| - """ |
69 |
| - topic_partition_offsets is a dict of dicts of (offset, max_bytes) tuples |
70 |
| - { |
71 |
| - "TopicFoo": { |
72 |
| - 0: (1234, 1048576), |
73 |
| - 1: (1324, 1048576) |
74 |
| - } |
75 |
| - } |
76 |
| - """ |
77 |
| - self.topic_partition_offsets = topic_partition_offsets |
78 |
| - self.max_wait_time = max_wait_time |
79 |
| - self.min_bytes = min_bytes |
80 |
| - self.replica_id = replica_id |
81 |
| - |
82 |
| - def encode(self): |
83 |
| - request = ( |
84 |
| - Int32.encode(self.replica_id) + |
85 |
| - Int32.encode(self.max_wait_time) + |
86 |
| - Int32.encode(self.min_bytes) + |
87 |
| - Array.encode([( |
88 |
| - String.encode(topic) + |
89 |
| - Array.encode([( |
90 |
| - Int32.encode(partition) + |
91 |
| - Int64.encode(offset) + |
92 |
| - Int32.encode(max_bytes) |
93 |
| - ) for partition, (offset, max_bytes) in partitions.iteritems()]) |
94 |
| - ) for topic, partitions in self.topic_partition_offsets.iteritems()])) |
95 |
| - return super(FetchRequest, self).encode(request) |
96 |
| - |
97 |
| - |
98 |
| -class OffsetRequest(AbstractRequest): |
99 |
| - API_KEY = 2 |
100 |
| - API_VERSION = 0 |
101 |
| - __slots__ = ('replica_id', 'topic_partition_times') |
102 |
| - |
103 |
| - def __init__(self, topic_partition_times, replica_id=-1): |
104 |
| - """ |
105 |
| - topic_partition_times is a dict of dicts of (time, max_offsets) tuples |
106 |
| - { |
107 |
| - "TopicFoo": { |
108 |
| - 0: (-1, 1), |
109 |
| - 1: (-1, 1) |
110 |
| - } |
111 |
| - } |
112 |
| - """ |
113 |
| - self.topic_partition_times = topic_partition_times |
114 |
| - self.replica_id = replica_id |
115 |
| - |
116 |
| - def encode(self): |
117 |
| - request = ( |
118 |
| - Int32.encode(self.replica_id) + |
119 |
| - Array.encode([( |
120 |
| - String.encode(topic) + |
121 |
| - Array.encode([( |
122 |
| - Int32.encode(partition) + |
123 |
| - Int64.encode(time) + |
124 |
| - Int32.encode(max_offsets) |
125 |
| - ) for partition, (time, max_offsets) in partitions.iteritems()]) |
126 |
| - ) for topic, partitions in self.topic_partition_times.iteritems()])) |
127 |
| - return super(OffsetRequest, self).encode(request) |
128 |
| - |
129 |
| - |
130 |
| -class MetadataRequest(AbstractRequest): |
131 |
| - API_KEY = 3 |
132 |
| - API_VERSION = 0 |
133 |
| - __slots__ = ('topics') |
134 |
| - |
135 |
| - def __init__(self, *topics): |
136 |
| - self.topics = topics |
137 |
| - |
138 |
| - def encode(self): |
139 |
| - request = Array.encode(map(String.encode, self.topics)) |
140 |
| - return super(MetadataRequest, self).encode(request) |
141 |
| - |
142 |
| - |
143 |
| -# Non-user facing control APIs 4-7 |
144 |
| - |
145 |
| - |
146 |
| -class OffsetCommitRequestV0(AbstractRequest): |
147 |
| - API_KEY = 8 |
148 |
| - API_VERSION = 0 |
149 |
| - __slots__ = ('consumer_group_id', 'offsets') |
150 |
| - |
151 |
| - def __init__(self, consumer_group_id, offsets): |
152 |
| - """ |
153 |
| - offsets is a dict of dicts of (offset, metadata) tuples |
154 |
| - { |
155 |
| - "TopicFoo": { |
156 |
| - 0: (1234, ""), |
157 |
| - 1: (1243, "") |
158 |
| - } |
159 |
| - } |
160 |
| - """ |
161 |
| - self.consumer_group_id = consumer_group_id |
162 |
| - self.offsets = offsets |
163 |
| - |
164 |
| - def encode(self): |
165 |
| - request = ( |
166 |
| - String.encode(self.consumer_group_id) + |
167 |
| - Array.encode([( |
168 |
| - String.encode(topic) + |
169 |
| - Array.encode([( |
170 |
| - Int32.encode(partition) + |
171 |
| - Int64.encode(offset) + |
172 |
| - String.encode(metadata) |
173 |
| - ) for partition, (offset, metadata) in partitions.iteritems()]) |
174 |
| - ) for topic, partitions in self.offsets.iteritems()])) |
175 |
| - return super(OffsetCommitRequestV0, self).encode(request) |
176 |
| - |
177 |
| - |
178 |
| -class OffsetCommitRequestV1(AbstractRequest): |
179 |
| - API_KEY = 8 |
180 |
| - API_VERSION = 1 |
181 |
| - __slots__ = ('consumer_group_id', 'consumer_group_generation_id', |
182 |
| - 'consumer_id', 'offsets') |
183 |
| - |
184 |
| - def __init__(self, consumer_group_id, consumer_group_generation_id, |
185 |
| - consumer_id, offsets): |
186 |
| - """ |
187 |
| - offsets is a dict of dicts of (offset, timestamp, metadata) tuples |
188 |
| - { |
189 |
| - "TopicFoo": { |
190 |
| - 0: (1234, 1448198827, ""), |
191 |
| - 1: (1243, 1448198827, "") |
192 |
| - } |
193 |
| - } |
194 |
| - """ |
195 |
| - self.consumer_group_id = consumer_group_id |
196 |
| - self.consumer_group_generation_id = consumer_group_generation_id |
197 |
| - self.consumer_id = consumer_id |
198 |
| - self.offsets = offsets |
199 |
| - |
200 |
| - def encode(self): |
201 |
| - request = ( |
202 |
| - String.encode(self.consumer_group_id) + |
203 |
| - Int32.encode(self.consumer_group_generation_id) + |
204 |
| - String.encode(self.consumer_id) + |
205 |
| - Array.encode([( |
206 |
| - String.encode(topic) + |
207 |
| - Array.encode([( |
208 |
| - Int32.encode(partition) + |
209 |
| - Int64.encode(offset) + |
210 |
| - Int64.encode(timestamp) + |
211 |
| - String.encode(metadata) |
212 |
| - ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) |
213 |
| - ) for topic, partitions in self.offsets.iteritems()])) |
214 |
| - return super(OffsetCommitRequestV1, self).encode(request) |
215 |
| - |
216 |
| - |
217 |
| -class OffsetCommitRequest(AbstractRequest): |
218 |
| - API_KEY = 8 |
219 |
| - API_VERSION = 2 |
220 |
| - __slots__ = ('consumer_group_id', 'consumer_group_generation_id', |
221 |
| - 'consumer_id', 'retention_time', 'offsets') |
222 |
| - |
223 |
| - def __init__(self, consumer_group_id, consumer_group_generation_id, |
224 |
| - consumer_id, retention_time, offsets): |
225 |
| - """ |
226 |
| - offsets is a dict of dicts of (offset, metadata) tuples |
227 |
| - { |
228 |
| - "TopicFoo": { |
229 |
| - 0: (1234, ""), |
230 |
| - 1: (1243, "") |
231 |
| - } |
232 |
| - } |
233 |
| - """ |
234 |
| - self.consumer_group_id = consumer_group_id |
235 |
| - self.consumer_group_generation_id = consumer_group_generation_id |
236 |
| - self.consumer_id = consumer_id |
237 |
| - self.retention_time = retention_time |
238 |
| - self.offsets = offsets |
239 |
| - |
240 |
| - def encode(self): |
241 |
| - request = ( |
242 |
| - String.encode(self.consumer_group_id) + |
243 |
| - Int32.encode(self.consumer_group_generation_id) + |
244 |
| - String.encode(self.consumer_id) + |
245 |
| - Int64.encode(self.retention_time) + |
246 |
| - Array.encode([( |
247 |
| - String.encode(topic) + |
248 |
| - Array.encode([( |
249 |
| - Int32.encode(partition) + |
250 |
| - Int64.encode(offset) + |
251 |
| - String.encode(metadata) |
252 |
| - ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) |
253 |
| - ) for topic, partitions in self.offsets.iteritems()])) |
254 |
| - return super(OffsetCommitRequest, self).encode(request) |
255 |
| - |
256 |
| - |
257 |
| -class OffsetFetchRequestV0(AbstractRequest): |
258 |
| - API_KEY = 9 |
259 |
| - API_VERSION = 0 |
260 |
| - __slots__ = ('consumer_group', 'topic_partitions') |
261 |
| - |
262 |
| - def __init__(self, consumer_group, topic_partitions): |
263 |
| - """ |
264 |
| - offsets is a dict of lists of partition ints |
265 |
| - { |
266 |
| - "TopicFoo": [0, 1, 2] |
267 |
| - } |
268 |
| - """ |
269 |
| - self.consumer_group = consumer_group |
270 |
| - self.topic_partitions = topic_partitions |
271 |
| - |
272 |
| - def encode(self): |
273 |
| - request = ( |
274 |
| - String.encode(self.consumer_group) + |
275 |
| - Array.encode([( |
276 |
| - String.encode(topic) + |
277 |
| - Array.encode([Int32.encode(partition) for partition in partitions]) |
278 |
| - ) for topic, partitions in self.topic_partitions.iteritems()]) |
279 |
| - ) |
280 |
| - return super(OffsetFetchRequest, self).encode(request) |
281 |
| - |
282 |
| - |
283 |
| -class OffsetFetchRequest(OffsetFetchRequestV0): |
284 |
| - """Identical to V0, but offsets fetched from kafka storage not zookeeper""" |
285 |
| - API_VERSION = 1 |
286 |
| - |
287 |
| - |
288 |
| -class GroupCoordinatorRequest(AbstractRequest): |
289 |
| - API_KEY = 10 |
290 |
| - API_VERSION = 0 |
291 |
| - __slots__ = ('group_id',) |
292 |
| - |
293 |
| - def __init__(self, group_id): |
294 |
| - self.group_id = group_id |
295 |
| - |
296 |
| - def encode(self): |
297 |
| - request = String.encode(self.group_id) |
298 |
| - return super(GroupCoordinatorRequest, self).encode(request) |
299 |
| - |
300 |
| - |
301 |
| - |
0 commit comments