Skip to content

Commit cc9ed8b

Browse files
committed
KAFKA-3388: Fix expiration of batches sitting in the accumulator (dpkp#699)
1 parent b000303 commit cc9ed8b

File tree

1 file changed

+39
-7
lines changed

1 file changed

+39
-7
lines changed

kafka/producer/record_accumulator.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def __init__(self, tp, records, message_version=0):
4040
self.record_count = 0
4141
#self.max_record_size = 0 # for metrics only
4242
now = time.time()
43-
#self.created = now # for metrics only
43+
self.created = now
4444
self.drained = None
4545
self.attempts = 0
4646
self.last_attempt = now
@@ -76,10 +76,28 @@ def done(self, base_offset=None, timestamp_ms=None, exception=None):
7676
else:
7777
self.produce_future.failure(exception)
7878

79-
def maybe_expire(self, request_timeout_ms, linger_ms):
80-
since_append_ms = 1000 * (time.time() - self.last_append)
81-
if ((self.records.is_full() and request_timeout_ms < since_append_ms)
82-
or (request_timeout_ms < (since_append_ms + linger_ms))):
79+
def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full):
80+
"""Expire batches if metadata is not available
81+
82+
A batch whose metadata is not available should be expired if one
83+
of the following is true:
84+
85+
* the batch is not in retry AND request timeout has elapsed after
86+
it is ready (full or linger.ms has reached).
87+
88+
* the batch is in retry AND request timeout has elapsed after the
89+
backoff period ended.
90+
"""
91+
now = time.time()
92+
since_append = now - self.last_append
93+
since_ready = now - (self.created + linger_ms / 1000.0)
94+
since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0)
95+
timeout = request_timeout_ms / 1000.0
96+
97+
if ((not self.in_retry() and is_full and timeout < since_append) or
98+
(not self.in_retry() and timeout < since_ready) or
99+
(self.in_retry() and timeout < since_backoff)):
100+
83101
self.records.close()
84102
self.done(-1, None, Errors.KafkaTimeoutError(
85103
"Batch containing %s record(s) expired due to timeout while"
@@ -259,19 +277,33 @@ def abort_expired_batches(self, request_timeout_ms, cluster):
259277
count = 0
260278
for tp in list(self._batches.keys()):
261279
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
280+
281+
# We only check if the batch should be expired if the partition
282+
# does not have a batch in flight. This is to avoid the later
283+
# batches get expired when an earlier batch is still in progress.
284+
# This protection only takes effect when user sets
285+
# max.in.flight.request.per.connection=1. Otherwise the expiration
286+
# order is not guranteed.
287+
if tp in self.muted:
288+
continue
289+
262290
with self._tp_locks[tp]:
263291
# iterate over the batches and expire them if they have stayed
264292
# in accumulator for more than request_timeout_ms
265293
dq = self._batches[tp]
266294
for batch in dq:
295+
is_full = bool(bool(batch != dq[-1]) or batch.records.is_full())
267296
# check if the batch is expired
268297
if batch.maybe_expire(request_timeout_ms,
269-
self.config['linger_ms']):
298+
self.config['retry_backoff_ms'],
299+
self.config['linger_ms'],
300+
is_full):
270301
expired_batches.append(batch)
271302
to_remove.append(batch)
272303
count += 1
273304
self.deallocate(batch)
274-
elif not batch.in_retry():
305+
else:
306+
# Stop at the first batch that has not expired.
275307
break
276308

277309
# Python does not allow us to mutate the dq during iteration

0 commit comments

Comments
 (0)