Skip to content

Commit f6d2168

Browse files
authored
Drain pending requests to the coordinator before initiating group rejoin (dpkp#798)
1 parent d3d3ad8 commit f6d2168

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

kafka/coordinator/base.py

+8
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,14 @@ def ensure_active_group(self):
242242
while self.need_rejoin():
243243
self.ensure_coordinator_known()
244244

245+
# ensure that there are no pending requests to the coordinator.
246+
# This is important in particular to avoid resending a pending
247+
# JoinGroup request.
248+
if self._client.in_flight_request_count(self.coordinator_id):
249+
while self._client.in_flight_request_count(self.coordinator_id):
250+
self._client.poll()
251+
continue
252+
245253
future = self._send_join_group_request()
246254
self._client.poll(future=future)
247255

0 commit comments

Comments
 (0)