-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathapi.py
141 lines (108 loc) · 3.72 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from __future__ import absolute_import
import abc
from kafka.protocol.struct import Struct
from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
from kafka.vendor.six import add_metaclass
class RequestHeader(Struct):
SCHEMA = Schema(
('api_key', Int16),
('api_version', Int16),
('correlation_id', Int32),
('client_id', String('utf-8'))
)
def __init__(self, request, correlation_id=0, client_id='kafka-python'):
super(RequestHeader, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id
)
class RequestHeaderV2(Struct):
# Flexible response / request headers end in field buffer
SCHEMA = Schema(
('api_key', Int16),
('api_version', Int16),
('correlation_id', Int32),
('client_id', String('utf-8')),
('tags', TaggedFields),
)
def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None):
super(RequestHeaderV2, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
)
class ResponseHeader(Struct):
SCHEMA = Schema(
('correlation_id', Int32),
)
class ResponseHeaderV2(Struct):
SCHEMA = Schema(
('correlation_id', Int32),
('tags', TaggedFields),
)
@add_metaclass(abc.ABCMeta)
class Request(Struct):
FLEXIBLE_VERSION = False
@abc.abstractproperty
def API_KEY(self):
"""Integer identifier for api request"""
pass
@abc.abstractproperty
def API_VERSION(self):
"""Integer of api request version"""
pass
@abc.abstractproperty
def SCHEMA(self):
"""An instance of Schema() representing the request structure"""
pass
@abc.abstractproperty
def RESPONSE_TYPE(self):
"""The Response class associated with the api request"""
pass
def expect_response(self):
"""Override this method if an api request does not always generate a response"""
return True
def to_object(self):
return _to_object(self.SCHEMA, self)
def build_header(self, correlation_id, client_id):
if self.FLEXIBLE_VERSION:
return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
@add_metaclass(abc.ABCMeta)
class Response(Struct):
FLEXIBLE_VERSION = False
@abc.abstractproperty
def API_KEY(self):
"""Integer identifier for api request/response"""
pass
@abc.abstractproperty
def API_VERSION(self):
"""Integer of api request/response version"""
pass
@abc.abstractproperty
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass
def to_object(self):
return _to_object(self.SCHEMA, self)
@classmethod
def parse_header(cls, read_buffer):
if cls.FLEXIBLE_VERSION:
return ResponseHeaderV2.decode(read_buffer)
return ResponseHeader.decode(read_buffer)
def _to_object(schema, data):
obj = {}
for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
if isinstance(data, Struct):
val = data.get_item(name)
else:
val = data[idx]
if isinstance(_type, Schema):
obj[name] = _to_object(_type, val)
elif isinstance(_type, Array):
if isinstance(_type.array_of, (Array, Schema)):
obj[name] = [
_to_object(_type.array_of, x)
for x in val
]
else:
obj[name] = val
else:
obj[name] = val
return obj