Skip to content

Commit a85e09d

Browse files
Dana Powerszackdever
Dana Powers
authored andcommitted
Rework protocol type definition: AbstractType, Schema, Struct
1 parent e24a4d5 commit a85e09d

File tree

10 files changed

+461
-371
lines changed

10 files changed

+461
-371
lines changed

kafka/protocol/abstract.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import abc
2+
3+
4+
class AbstractType(object):
5+
__metaclass__ = abc.ABCMeta
6+
7+
@abc.abstractmethod
8+
def encode(cls, value):
9+
pass
10+
11+
@abc.abstractmethod
12+
def decode(cls, data):
13+
pass

kafka/protocol/api.py

+12-297
Original file line numberDiff line numberDiff line change
@@ -1,301 +1,16 @@
1-
import struct
1+
from .struct import Struct
2+
from .types import Int16, Int32, String, Schema
23

3-
from .types import (
4-
Int8, Int16, Int32, Int64, Bytes, String, Array
5-
)
6-
from ..util import crc32
74

5+
class RequestHeader(Struct):
6+
SCHEMA = Schema(
7+
('api_key', Int16),
8+
('api_version', Int16),
9+
('correlation_id', Int32),
10+
('client_id', String('utf-8'))
11+
)
812

9-
class Message(object):
10-
MAGIC_BYTE = 0
11-
__slots__ = ('magic', 'attributes', 'key', 'value')
12-
13-
def __init__(self, value, key=None, magic=0, attributes=0):
14-
self.magic = magic
15-
self.attributes = attributes
16-
self.key = key
17-
self.value = value
18-
19-
def encode(self):
20-
message = (
21-
Int8.encode(self.magic) +
22-
Int8.encode(self.attributes) +
23-
Bytes.encode(self.key) +
24-
Bytes.encode(self.value)
13+
def __init__(self, request, correlation_id=0, client_id='kafka-python'):
14+
super(RequestHeader, self).__init__(
15+
request.API_KEY, request.API_VERSION, correlation_id, client_id
2516
)
26-
return (
27-
struct.pack('>I', crc32(message)) +
28-
message
29-
)
30-
31-
32-
class MessageSet(object):
33-
34-
@staticmethod
35-
def _encode_one(message):
36-
encoded = message.encode()
37-
return (Int64.encode(0) + Int32.encode(len(encoded)) + encoded)
38-
39-
@staticmethod
40-
def encode(messages):
41-
return b''.join(map(MessageSet._encode_one, messages))
42-
43-
44-
class AbstractRequestResponse(object):
45-
@classmethod
46-
def encode(cls, message):
47-
return Int32.encode(len(message)) + message
48-
49-
50-
class AbstractRequest(AbstractRequestResponse):
51-
@classmethod
52-
def encode(cls, request, correlation_id=0, client_id='kafka-python'):
53-
request = (Int16.encode(cls.API_KEY) +
54-
Int16.encode(cls.API_VERSION) +
55-
Int32.encode(correlation_id) +
56-
String.encode(client_id) +
57-
request)
58-
return super(AbstractRequest, cls).encode(request)
59-
60-
61-
class FetchRequest(AbstractRequest):
62-
API_KEY = 1
63-
API_VERSION = 0
64-
__slots__ = ('replica_id', 'max_wait_time', 'min_bytes', 'topic_partition_offsets')
65-
66-
def __init__(self, topic_partition_offsets,
67-
max_wait_time=-1, min_bytes=0, replica_id=-1):
68-
"""
69-
topic_partition_offsets is a dict of dicts of (offset, max_bytes) tuples
70-
{
71-
"TopicFoo": {
72-
0: (1234, 1048576),
73-
1: (1324, 1048576)
74-
}
75-
}
76-
"""
77-
self.topic_partition_offsets = topic_partition_offsets
78-
self.max_wait_time = max_wait_time
79-
self.min_bytes = min_bytes
80-
self.replica_id = replica_id
81-
82-
def encode(self):
83-
request = (
84-
Int32.encode(self.replica_id) +
85-
Int32.encode(self.max_wait_time) +
86-
Int32.encode(self.min_bytes) +
87-
Array.encode([(
88-
String.encode(topic) +
89-
Array.encode([(
90-
Int32.encode(partition) +
91-
Int64.encode(offset) +
92-
Int32.encode(max_bytes)
93-
) for partition, (offset, max_bytes) in partitions.iteritems()])
94-
) for topic, partitions in self.topic_partition_offsets.iteritems()]))
95-
return super(FetchRequest, self).encode(request)
96-
97-
98-
class OffsetRequest(AbstractRequest):
99-
API_KEY = 2
100-
API_VERSION = 0
101-
__slots__ = ('replica_id', 'topic_partition_times')
102-
103-
def __init__(self, topic_partition_times, replica_id=-1):
104-
"""
105-
topic_partition_times is a dict of dicts of (time, max_offsets) tuples
106-
{
107-
"TopicFoo": {
108-
0: (-1, 1),
109-
1: (-1, 1)
110-
}
111-
}
112-
"""
113-
self.topic_partition_times = topic_partition_times
114-
self.replica_id = replica_id
115-
116-
def encode(self):
117-
request = (
118-
Int32.encode(self.replica_id) +
119-
Array.encode([(
120-
String.encode(topic) +
121-
Array.encode([(
122-
Int32.encode(partition) +
123-
Int64.encode(time) +
124-
Int32.encode(max_offsets)
125-
) for partition, (time, max_offsets) in partitions.iteritems()])
126-
) for topic, partitions in self.topic_partition_times.iteritems()]))
127-
return super(OffsetRequest, self).encode(request)
128-
129-
130-
class MetadataRequest(AbstractRequest):
131-
API_KEY = 3
132-
API_VERSION = 0
133-
__slots__ = ('topics')
134-
135-
def __init__(self, *topics):
136-
self.topics = topics
137-
138-
def encode(self):
139-
request = Array.encode(map(String.encode, self.topics))
140-
return super(MetadataRequest, self).encode(request)
141-
142-
143-
# Non-user facing control APIs 4-7
144-
145-
146-
class OffsetCommitRequestV0(AbstractRequest):
147-
API_KEY = 8
148-
API_VERSION = 0
149-
__slots__ = ('consumer_group_id', 'offsets')
150-
151-
def __init__(self, consumer_group_id, offsets):
152-
"""
153-
offsets is a dict of dicts of (offset, metadata) tuples
154-
{
155-
"TopicFoo": {
156-
0: (1234, ""),
157-
1: (1243, "")
158-
}
159-
}
160-
"""
161-
self.consumer_group_id = consumer_group_id
162-
self.offsets = offsets
163-
164-
def encode(self):
165-
request = (
166-
String.encode(self.consumer_group_id) +
167-
Array.encode([(
168-
String.encode(topic) +
169-
Array.encode([(
170-
Int32.encode(partition) +
171-
Int64.encode(offset) +
172-
String.encode(metadata)
173-
) for partition, (offset, metadata) in partitions.iteritems()])
174-
) for topic, partitions in self.offsets.iteritems()]))
175-
return super(OffsetCommitRequestV0, self).encode(request)
176-
177-
178-
class OffsetCommitRequestV1(AbstractRequest):
179-
API_KEY = 8
180-
API_VERSION = 1
181-
__slots__ = ('consumer_group_id', 'consumer_group_generation_id',
182-
'consumer_id', 'offsets')
183-
184-
def __init__(self, consumer_group_id, consumer_group_generation_id,
185-
consumer_id, offsets):
186-
"""
187-
offsets is a dict of dicts of (offset, timestamp, metadata) tuples
188-
{
189-
"TopicFoo": {
190-
0: (1234, 1448198827, ""),
191-
1: (1243, 1448198827, "")
192-
}
193-
}
194-
"""
195-
self.consumer_group_id = consumer_group_id
196-
self.consumer_group_generation_id = consumer_group_generation_id
197-
self.consumer_id = consumer_id
198-
self.offsets = offsets
199-
200-
def encode(self):
201-
request = (
202-
String.encode(self.consumer_group_id) +
203-
Int32.encode(self.consumer_group_generation_id) +
204-
String.encode(self.consumer_id) +
205-
Array.encode([(
206-
String.encode(topic) +
207-
Array.encode([(
208-
Int32.encode(partition) +
209-
Int64.encode(offset) +
210-
Int64.encode(timestamp) +
211-
String.encode(metadata)
212-
) for partition, (offset, timestamp, metadata) in partitions.iteritems()])
213-
) for topic, partitions in self.offsets.iteritems()]))
214-
return super(OffsetCommitRequestV1, self).encode(request)
215-
216-
217-
class OffsetCommitRequest(AbstractRequest):
218-
API_KEY = 8
219-
API_VERSION = 2
220-
__slots__ = ('consumer_group_id', 'consumer_group_generation_id',
221-
'consumer_id', 'retention_time', 'offsets')
222-
223-
def __init__(self, consumer_group_id, consumer_group_generation_id,
224-
consumer_id, retention_time, offsets):
225-
"""
226-
offsets is a dict of dicts of (offset, metadata) tuples
227-
{
228-
"TopicFoo": {
229-
0: (1234, ""),
230-
1: (1243, "")
231-
}
232-
}
233-
"""
234-
self.consumer_group_id = consumer_group_id
235-
self.consumer_group_generation_id = consumer_group_generation_id
236-
self.consumer_id = consumer_id
237-
self.retention_time = retention_time
238-
self.offsets = offsets
239-
240-
def encode(self):
241-
request = (
242-
String.encode(self.consumer_group_id) +
243-
Int32.encode(self.consumer_group_generation_id) +
244-
String.encode(self.consumer_id) +
245-
Int64.encode(self.retention_time) +
246-
Array.encode([(
247-
String.encode(topic) +
248-
Array.encode([(
249-
Int32.encode(partition) +
250-
Int64.encode(offset) +
251-
String.encode(metadata)
252-
) for partition, (offset, timestamp, metadata) in partitions.iteritems()])
253-
) for topic, partitions in self.offsets.iteritems()]))
254-
return super(OffsetCommitRequest, self).encode(request)
255-
256-
257-
class OffsetFetchRequestV0(AbstractRequest):
258-
API_KEY = 9
259-
API_VERSION = 0
260-
__slots__ = ('consumer_group', 'topic_partitions')
261-
262-
def __init__(self, consumer_group, topic_partitions):
263-
"""
264-
offsets is a dict of lists of partition ints
265-
{
266-
"TopicFoo": [0, 1, 2]
267-
}
268-
"""
269-
self.consumer_group = consumer_group
270-
self.topic_partitions = topic_partitions
271-
272-
def encode(self):
273-
request = (
274-
String.encode(self.consumer_group) +
275-
Array.encode([(
276-
String.encode(topic) +
277-
Array.encode([Int32.encode(partition) for partition in partitions])
278-
) for topic, partitions in self.topic_partitions.iteritems()])
279-
)
280-
return super(OffsetFetchRequest, self).encode(request)
281-
282-
283-
class OffsetFetchRequest(OffsetFetchRequestV0):
284-
"""Identical to V0, but offsets fetched from kafka storage not zookeeper"""
285-
API_VERSION = 1
286-
287-
288-
class GroupCoordinatorRequest(AbstractRequest):
289-
API_KEY = 10
290-
API_VERSION = 0
291-
__slots__ = ('group_id',)
292-
293-
def __init__(self, group_id):
294-
self.group_id = group_id
295-
296-
def encode(self):
297-
request = String.encode(self.group_id)
298-
return super(GroupCoordinatorRequest, self).encode(request)
299-
300-
301-

0 commit comments

Comments
 (0)