Skip to content

Commit 22a1b6d

Browse files
committed
Fix accumulator bug: expired batches should be removed from the internal queue
1 parent 2a783d0 commit 22a1b6d

File tree

1 file changed

+10
-0
lines changed

1 file changed

+10
-0
lines changed

kafka/producer/record_accumulator.py

+10
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ def abort_expired_batches(self, request_timeout_ms, cluster):
243243
list of RecordBatch that were expired
244244
"""
245245
expired_batches = []
246+
to_remove = []
246247
count = 0
247248
for tp, dq in six.iteritems(self._batches):
248249
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
@@ -254,11 +255,20 @@ def abort_expired_batches(self, request_timeout_ms, cluster):
254255
if batch.maybe_expire(request_timeout_ms,
255256
self.config['linger_ms']):
256257
expired_batches.append(batch)
258+
to_remove.append(batch)
257259
count += 1
258260
self.deallocate(batch)
259261
elif not batch.in_retry():
260262
break
261263

264+
# Python does not allow us to mutate the dq during iteration
265+
# Assuming expired batches are infrequent, this is better than
266+
# creating a new copy of the deque for iteration on every loop
267+
if to_remove:
268+
for batch in to_remove:
269+
dq.remove(batch)
270+
to_remove = []
271+
262272
if expired_batches:
263273
log.debug("Expired %d batches in accumulator", count) # trace
264274

0 commit comments

Comments
 (0)