Skip to content

Commit 08d3308

Browse files
committed
Scratch notes on sansio coordinator
1 parent 3fb0969 commit 08d3308

File tree

3 files changed

+246
-0
lines changed

3 files changed

+246
-0
lines changed

kafka/sansio/__init__.py

Whitespace-only changes.

kafka/sansio/fetcher.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
2+
single partition -> records
3+
4+
5+
mulitplex partitions owned by the same broker
6+
7+
8+
conn -> partition-fetcher -> records

kafka/sansio/group.py

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
from __future__ import absolute_import, division, print_function
2+
3+
4+
class GroupState:
5+
UNJOINED = '<unjoined>'
6+
JOINING = '<joining>'
7+
SYNCING = '<syncing>'
8+
STABLE = '<stable>'
9+
NO_BROKER = '<no-broker>'
10+
11+
12+
class GroupInputs:
13+
BEGIN_JOIN = '<begin-join>'
14+
JOINED_LEADER = '<joined-leader>'
15+
JOINED_FOLLOWER = '<joined-follower>'
16+
SYNCED = '<synced>'
17+
UNKNOWN_MEMBER = '<unknown-member>'
18+
ILLEGAL_GENERATION = '<illegal-generation>'
19+
REBALANCE_IN_PROGRESS = '<rebalance-in-progress>'
20+
CLOSE = '<close>'
21+
#NOT_COORDINATOR_ERROR = '<not-coordinator-error>'
22+
#UNKNOWN_ERROR = '<unknown-error>'
23+
#AUTHORIZATION_ERROR = '<authorization-error>'
24+
#HEARTBEAT_ERROR = '<heartbeat-error>'
25+
26+
27+
class GroupOutputs:
28+
SEND_INIT_JOIN = '<send-init-join>'
29+
SEND_REJOIN = '<send-rejoin>'
30+
SEND_SYNC_LEADER = '<send-sync-leader>'
31+
SEND_SYNC_FOLLOWER = '<send-sync-follower>'
32+
SEND_LEAVE_GROUP = '<send-leave-group>'
33+
FATAL_ERROR = '<fatal-error>'
34+
35+
"""
36+
class RequestBuilder:
37+
INIT_JOIN -> (group_id, session_timeout_ms, max_poll_interval_ms, protocol_type, [(assignor, metadata), ...])
38+
REJOIN -> (group_id, session_timeout_ms, max_poll_interval_ms, member_id, protocol_type, [(assignor, metadata), ...])
39+
SYNC_LEADER -> (group_id, generation_id, member_id, leader_id, group_protocol, group_assignment) [(member_id, metadata), ...],
40+
SYNC_FOLLOWER -> (group_id, generation_id, member_id)
41+
42+
group_id
43+
session_timeout_ms
44+
max_poll_interval_ms
45+
protocol_type
46+
"""
47+
48+
49+
class KafkaGroupStateMachine:
50+
# State -> Input -> (New State, Output)
51+
TRANSITIONS = {
52+
GroupState.UNJOINED: {
53+
GroupInputs.BEGIN_JOIN: (GroupState.JOINING, GroupOutputs.SEND_INIT_JOIN),
54+
}
55+
GroupState.JOINING: {
56+
GroupInputs.JOINED_LEADER: (GroupState.SYNCING, GroupOutputs.SEND_SYNC_LEADER),
57+
GroupInputs.JOINED_FOLLOWER: (GroupState.SYNCING, GroupOutputs.SEND_SYNC_FOLLOWER),
58+
GroupInputs.UNKNOWN_MEMBER: (GroupState.JOINING, GroupOutputs.SEND_INIT_JOIN),
59+
}
60+
GroupState.SYNCING: {
61+
GroupInputs.SYNCED: (GroupState.STABLE, None),
62+
GroupInputs.UNKNOWN_MEMBER: (GroupState.JOINING, GroupOutputs.SEND_INIT_JOIN),
63+
GroupInputs.ILLEGAL_GENERATION: (GroupState.JOINING, GroupOutputs.SEND_REJOIN),
64+
GroupInputs.REBALANCE_IN_PROGRESS: (GroupState.JOINING, GroupOutputs.SEND_REJOIN),
65+
}
66+
GroupState.STABLE: {
67+
GroupInputs.BEGIN_JOIN: (GroupState.JOINING, GroupOutputs.SEND_REJOIN),
68+
GroupInputs.HEARTBEAT: (GroupState.STABLE, None),
69+
GroupInputs.CLOSE: (GroupState.UNJOINED, GroupOutputs.SEND_LEAVE_GROUP),
70+
GroupInputs.UNKNOWN_MEMBER: (GroupState.JOINING, GroupOutputs.SEND_INIT_JOIN),
71+
GroupInputs.ILLEGAL_GENERATION: (GroupState.JOINING, GroupOutputs.SEND_REJOIN),
72+
GroupInputs.REBALANCE_IN_PROGRESS: (GroupState.JOINING, GroupOutputs.SEND_REJOIN),
73+
}
74+
}
75+
76+
def __init__(self):
77+
self.state = GroupState.UNJOINED
78+
self.group_id = group_id
79+
self.member_id = ''
80+
self.session_timeout_ms = session_timeout_ms
81+
self.protocol_type = protocol_type
82+
self.heartbeat_interval = 0.5
83+
self.last_heartbeat = -1
84+
85+
def process_input(self, input_):
86+
"""Given a state and an input, return a new state and any triggered output"""
87+
try:
88+
new_state, output = self.TRANSITIONS[self.state][input_]
89+
except KeyError:
90+
raise ValueError('Unrecognized state/input')
91+
else:
92+
self.state = new_state
93+
if input_ is GroupState.HEARTBEAT:
94+
self.last_heartbeat = time.time()
95+
return output
96+
97+
def should_heartbeat(self):
98+
if self.state is not GroupState.STABLE:
99+
return False
100+
if self.last_heartbeat > time.time() + self.heartbeat_interval:
101+
return False
102+
return True
103+
104+
def join(self, protocols):
105+
# protocols: list of (protocol, metadata) tuples
106+
# but this is only needed in connection, not FSM
107+
pass
108+
109+
def assign(self, group_assignment):
110+
pass
111+
112+
def leave(self):
113+
pass
114+
115+
116+
class Coordinator:
117+
def __init__(self, protocol, fsm):
118+
self.protocol = protocol
119+
self.fsm = fsm
120+
self.transport = None
121+
122+
def connection_made(self, transport):
123+
self.transport = transport
124+
125+
def receive_bytes(self, data):
126+
responses = self.protocol.receive_bytes(data)
127+
for _, response in responses:
128+
event = self.response_to_event(response)
129+
self.receive_event(event)
130+
131+
def receive_event(self, event):
132+
"""Take api responses, update state, return Requests"""
133+
# would an Event class work better here?
134+
# Could be ApiResponseEvent, LocalEvent (perform assignment, start heartbeat)
135+
output_events = self.fsm.process_input(event)
136+
for output_event in output_events:
137+
self.process_event(output_event)
138+
139+
def process_event(self, event):
140+
if event == GroupOutputs.SEND_INIT_JOIN:
141+
self.send_init_join()
142+
elif event == GroupOutputs.SEND_REJOIN:
143+
self.send_rejoin()
144+
elif event == GroupOutputs.SEND_SYNC_LEADER:
145+
self.send_sync_leader()
146+
elif event == GroupOutputs.SEND_SYNC_FOLLOWER:
147+
self.send_sync_follower()
148+
elif event == GroupOutputs.START_HEARTBEAT:
149+
self.start_heartbeats()
150+
elif event == GroupOutputs.STOP_HEARTBEAT:
151+
self.stop_heartbeats()
152+
elif event == GroupOutputs.SEND_LEAVE_GROUP:
153+
self.send_leave()
154+
elif event == GroupOutput.FATAL_ERROR:
155+
raise Exception(event)
156+
157+
def join(self):
158+
"""Return api request"""
159+
self.receive_event(GroupInput.BEGIN_JOIN)
160+
161+
def rejoin():
162+
"""Returns api request"""
163+
# call this for offsetcommitresponse?
164+
pass
165+
166+
def leave():
167+
"""Leave Group"""
168+
pass
169+
170+
def cb_do_assignment():
171+
pass
172+
173+
def cb_start_heartbeat():
174+
pass
175+
176+
def cb_stop_heartbeat():
177+
pass
178+
179+
def offset_commits?
180+
pass
181+
182+
class CoordinatorGroup:
183+
def __init__(self, group_id, session_timeout_ms=10000, protocol_type='consumer'):
184+
self.group_id = group_id
185+
self.member_id = b''
186+
self.generation = None
187+
self.session_timeout_ms = session_timeout_ms
188+
self.protocol_type = protocol_type
189+
190+
def send_init_join(self):
191+
return JoinGroupRequest[0](
192+
self.group_id,
193+
self.session_timeout_ms,
194+
self.member_id,
195+
self.protocol_type,
196+
[(protocol,
197+
metadata if isinstance(metadata, bytes) else metadata.encode())
198+
for protocol, metadata in self.group_protocols()])
199+
200+
def send_rejoin(self):
201+
return JoinGroupRequest[0](
202+
self.group_id,
203+
self.session_timeout_ms,
204+
self.member_id,
205+
self.protocol_type,
206+
[(protocol,
207+
metadata if isinstance(metadata, bytes) else metadata.encode())
208+
for protocol, metadata in self.group_protocols()])
209+
210+
def send_sync_leader(self):
211+
assignments = self.cb_do_assignment()
212+
return SyncGroupRequest[0](...)
213+
214+
def send_sync_follower(self):
215+
return SyncGroupRequest[0](...)
216+
217+
def send_heartbeat(self):
218+
return HeartbeatRequest[0](...)
219+
220+
221+
coordinator
222+
coordinator-transport: manages socket connection, passes received_bytes to protocol
223+
coordinator-protocol:
224+
events in, events out?
225+
226+
public methods (join, leave, heartbeat) pass input_events to coordinator-FSM, return nothing?
227+
228+
passes received_bytes to kafka-protocol,
229+
passes received_events to event_to_input
230+
passes input_event to coordinator-FSM
231+
convert output_events to dispatch functions + call
232+
233+
dispatch functions generate api events
234+
api events passed to kafka_protocol
235+
bytes_to_send passed to transport
236+
237+
coordinator-group: manages group settings, member state, generation, creates api requests
238+
coordinator-fsm

0 commit comments

Comments
 (0)