Skip to content

Commit 799f53f

Browse files
committed
Fix bug in SimpleBufferPool memory condition waiting / timeout
1 parent 72fa7ef commit 799f53f

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

kafka/producer/buffer.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -158,15 +158,16 @@ def allocate(self, size, max_time_to_block_ms):
158158
# enough memory to allocate one
159159
while buf is None:
160160
start_wait = time.time()
161-
if not more_memory.wait(max_time_to_block_ms / 1000.0):
162-
raise Errors.KafkaTimeoutError(
163-
"Failed to allocate memory within the configured"
164-
" max blocking time")
161+
more_memory.wait(max_time_to_block_ms / 1000.0)
165162
end_wait = time.time()
166163
#this.waitTime.record(endWait - startWait, time.milliseconds());
167164

168165
if self._free:
169166
buf = self._free.popleft()
167+
else:
168+
raise Errors.KafkaTimeoutError(
169+
"Failed to allocate memory within the configured"
170+
" max blocking time")
170171

171172
# remove the condition for this thread to let the next thread
172173
# in line start getting memory

0 commit comments

Comments
 (0)