@@ -40,7 +40,7 @@ def __init__(self, tp, records, message_version=0):
40
40
self .record_count = 0
41
41
#self.max_record_size = 0 # for metrics only
42
42
now = time .time ()
43
- # self.created = now # for metrics only
43
+ self .created = now
44
44
self .drained = None
45
45
self .attempts = 0
46
46
self .last_attempt = now
@@ -76,10 +76,28 @@ def done(self, base_offset=None, timestamp_ms=None, exception=None):
76
76
else :
77
77
self .produce_future .failure (exception )
78
78
79
- def maybe_expire (self , request_timeout_ms , linger_ms ):
80
- since_append_ms = 1000 * (time .time () - self .last_append )
81
- if ((self .records .is_full () and request_timeout_ms < since_append_ms )
82
- or (request_timeout_ms < (since_append_ms + linger_ms ))):
79
+ def maybe_expire (self , request_timeout_ms , retry_backoff_ms , linger_ms , is_full ):
80
+ """Expire batches if metadata is not available
81
+
82
+ A batch whose metadata is not available should be expired if one
83
+ of the following is true:
84
+
85
+ * the batch is not in retry AND request timeout has elapsed after
86
+ it is ready (full or linger.ms has reached).
87
+
88
+ * the batch is in retry AND request timeout has elapsed after the
89
+ backoff period ended.
90
+ """
91
+ now = time .time ()
92
+ since_append = now - self .last_append
93
+ since_ready = now - (self .created + linger_ms / 1000.0 )
94
+ since_backoff = now - (self .last_attempt + retry_backoff_ms / 1000.0 )
95
+ timeout = request_timeout_ms / 1000.0
96
+
97
+ if ((not self .in_retry () and is_full and timeout < since_append ) or
98
+ (not self .in_retry () and timeout < since_ready ) or
99
+ (self .in_retry () and timeout < since_backoff )):
100
+
83
101
self .records .close ()
84
102
self .done (- 1 , None , Errors .KafkaTimeoutError (
85
103
"Batch containing %s record(s) expired due to timeout while"
@@ -259,19 +277,33 @@ def abort_expired_batches(self, request_timeout_ms, cluster):
259
277
count = 0
260
278
for tp in list (self ._batches .keys ()):
261
279
assert tp in self ._tp_locks , 'TopicPartition not in locks dict'
280
+
281
+ # We only check if the batch should be expired if the partition
282
+ # does not have a batch in flight. This is to avoid the later
283
+ # batches get expired when an earlier batch is still in progress.
284
+ # This protection only takes effect when user sets
285
+ # max.in.flight.request.per.connection=1. Otherwise the expiration
286
+ # order is not guranteed.
287
+ if tp in self .muted :
288
+ continue
289
+
262
290
with self ._tp_locks [tp ]:
263
291
# iterate over the batches and expire them if they have stayed
264
292
# in accumulator for more than request_timeout_ms
265
293
dq = self ._batches [tp ]
266
294
for batch in dq :
295
+ is_full = bool (bool (batch != dq [- 1 ]) or batch .records .is_full ())
267
296
# check if the batch is expired
268
297
if batch .maybe_expire (request_timeout_ms ,
269
- self .config ['linger_ms' ]):
298
+ self .config ['retry_backoff_ms' ],
299
+ self .config ['linger_ms' ],
300
+ is_full ):
270
301
expired_batches .append (batch )
271
302
to_remove .append (batch )
272
303
count += 1
273
304
self .deallocate (batch )
274
- elif not batch .in_retry ():
305
+ else :
306
+ # Stop at the first batch that has not expired.
275
307
break
276
308
277
309
# Python does not allow us to mutate the dq during iteration
0 commit comments