-
-
Notifications
You must be signed in to change notification settings - Fork 32.5k
gh-74028: concurrent.futures.Executor.map
: introduce buffersize
param for lazier behavior
#125663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
45c3ec5
bfb2c5c
8539663
022b8c6
7ced787
cb5f26e
f46ebe6
eb26e86
0821f95
d95c55b
c80f466
90e6d7c
ab91694
01b8adf
2f8a63f
1fb53a5
365c85d
bf5f838
a0057f1
1aa1275
e0a9a9e
8d6ea97
6124868
c11276f
ebb5337
602968c
b14e368
d37ce09
cdf239c
0a49784
178d6fe
516a94b
ba4ac81
9588059
0427bf1
af88fdf
1fcf3fe
0892b2b
579ba31
332826a
ef814e5
26c8d8d
7b1d5f6
8dac531
bb756f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -606,13 +606,13 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): | |
if timeout is not None: | ||
end_time = timeout + time.monotonic() | ||
|
||
args_iter = iter(zip(*iterables)) | ||
zipped_iterables = iter(zip(*iterables)) | ||
if buffersize: | ||
fs = collections.deque( | ||
self.submit(fn, *args) for args in islice(args_iter, buffersize) | ||
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize) | ||
) | ||
else: | ||
fs = [self.submit(fn, *args) for args in args_iter] | ||
fs = [self.submit(fn, *args) for args in zipped_iterables] | ||
|
||
# Use a weak reference to ensure that the executor can be garbage | ||
# collected independently of the result_iterator closure. | ||
|
@@ -628,7 +628,7 @@ def result_iterator(): | |
if ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ebonnal I believe you got this part slightly wrong, "off-by-one". IIUC, the number of pending futures cannot be larger than Fortunately, looks like the fix is trivial: you simply have to yield first, next append to the queue: diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index d98b1ebdd58..3b9ccf4d651 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -628,17 +628,17 @@ def result_iterator():
# reverse to keep finishing order
fs.reverse()
while fs:
+ # Careful not to keep a reference to the popped future
+ if timeout is None:
+ yield _result_or_cancel(fs.pop())
+ else:
+ yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
if (
buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
- # Careful not to keep a reference to the popped future
- if timeout is None:
- yield _result_or_cancel(fs.pop())
- else:
- yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @dalcinl, TL;DR: fyi we have #131467 that is open and tackling this "off-by-one" situation. Would be great to get your review there! 🙏🏻 You will notice that it is not "just" moving the yield before the enqueue, let's see why on a simple scenario, explaining the 3 behaviors: Scenario:it = executor.map(fn, iterable, buffersize=buffersize)
# point A
next(it)
# point B
next(it)
# point C "enqueue -> wait -> yield" (current, introduced by this PR) behavior
pro: "wait -> yield -> enqueue" (your proposal) behavior
pro: never exceed "wait -> enqueue -> yield" (#131467) behavior
pros:
Let me know if it makes sense 🙏🏻 |
||
buffersize | ||
and (executor := executor_weakref()) | ||
and (args := next(args_iter, None)) | ||
and (args := next(zipped_iterables, None)) | ||
): | ||
fs.appendleft(executor.submit(fn, *args)) | ||
gpshead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Careful not to keep a reference to the popped future | ||
|
Uh oh!
There was an error while loading. Please reload this page.