We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent d3d3ad8 commit f6d2168Copy full SHA for f6d2168
kafka/coordinator/base.py
@@ -242,6 +242,14 @@ def ensure_active_group(self):
242
while self.need_rejoin():
243
self.ensure_coordinator_known()
244
245
+ # ensure that there are no pending requests to the coordinator.
246
+ # This is important in particular to avoid resending a pending
247
+ # JoinGroup request.
248
+ if self._client.in_flight_request_count(self.coordinator_id):
249
+ while self._client.in_flight_request_count(self.coordinator_id):
250
+ self._client.poll()
251
+ continue
252
+
253
future = self._send_join_group_request()
254
self._client.poll(future=future)
255
0 commit comments