-
-
Notifications
You must be signed in to change notification settings - Fork 31.8k
gh-131466: concurrent.futures.Executor.map
: avoid temporarily exceeding buffersize
while collecting the next result
#131467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
gh-131466: concurrent.futures.Executor.map
: avoid temporarily exceeding buffersize
while collecting the next result
#131467
Conversation
…llecting the next result
…Test.test_free_reference
concurrent.futures.Executor.map
: avoid temporarily exceeding buffersize
while collecting the next resultconcurrent.futures.Executor.map
: avoid temporarily exceeding buffersize
while collecting the next result
Lib/concurrent/futures/_base.py
Outdated
yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) | ||
|
||
# Yield the awaited result | ||
yield fs.pop().result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be discussed: this could be replaced by a lighter yield fs.pop()._result
because the prior call to _result_or_cancel
guarantees that at this point the result is available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I understand that we could possibly exceed buffersize
while collecting the next result, is there a real-word use case where it would really cause an issue? the reason is that we access to fs[-1]
and then do fs.pop()
.
I see that have a del fut
in _result_or_cancel()
but can you confirm that it's sufficient to not hold any reference to the yet-to-be-popped future?
Asking Gregory as well since he's the mp expert c: |
@picnixz sorry I re-asked your review because you made me realize that we actually don't need
I'm digging deeper into #95169 's context to check if I miss any non-tested scenario, especially regarding this:
|
yes, that's what I wanted to ask, but I'm not an expert here so i'll let you investigate first c: |
Is this PR blocked by the other one or should I do something in particular? |
@ebonnal Sorry for the late reply. What about this simpler and IMHO cleaner way below? The second to last line may be a bit controversial (it changes the type of a variable), but I've used that list-pop trick in my diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index d98b1ebdd58..de34b86d1ee 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -625,21 +625,26 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
# before the first iterator value is required.
def result_iterator():
try:
+ result = None
# reverse to keep finishing order
fs.reverse()
while fs:
+ # Careful not to keep a reference to the popped future
+ if timeout is None:
+ result = _result_or_cancel(fs.pop())
+ else:
+ result = _result_or_cancel(fs.pop(), end_time - time.monotonic())
if (
buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
- # Careful not to keep a reference to the popped future
- if timeout is None:
- yield _result_or_cancel(fs.pop())
- else:
- yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
+ # Careful not to keep a reference to the result
+ result = [result]
+ yield result.pop()
finally:
+ del result
for future in fs:
future.cancel()
return result_iterator() |
Thank you for taking a look @dalcinl !
What exactly do you find unclean in my proposal and justifying a list creation for each yielded element? (fun trick though!) |
I guess it is just a matter of subjective taste, my patch looks slightly shorter, but I should say that the primary motivation was avoiding the use of the (conventionally) private I'm biased, as I maintain an custom implementation of this routine, and I prefer to avoid the use of private APIs and attributes. Standard library modules may not be bound to such constraints. Long story short, I believe both your proposal and mine are functionally equivalent, so FWIW, this PR has my +1. |
I thought that was acceptable because
Fair, actually I remember now that another alternative I had considered in the early days of this PR was:
Which is similar to your approach but reuses the same container (a
Thanks again for your review, I appreciate it, let's wait and gather more feedback 👀 ! |
I looks even better!! I'll borrow your approach for my own code. I you ever update this PR, please do not forget the |
Context recap:
If we have:
What happens when calling
next(results)
:arg
frominterable
and put a task forfn(arg)
in the buffer-> During step 2. there is
buffersize + 1
buffered tasks.This PR swaps steps 1. and 2. so that
buffersize
is never exceeded, even duringnext
.concurrent.futures.Executor.map
temporarily exceeds itsbuffersize
while collecting the next result #131466