Skip to content

gh-74028: concurrent.futures.Executor.map: introduce buffersize param for lazier behavior #125663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
45c3ec5
bpo-29842: concurrent.futures.Executor.map: add buffersize param for …
ebonnal Oct 12, 2024
bfb2c5c
test_map_buffersize: 1s sleep
ebonnal Oct 17, 2024
8539663
mention chunksize in ProcessPoolExecutor's buffersize docstring
ebonnal Oct 18, 2024
022b8c6
merge unittest into ExecutorTest
ebonnal Oct 18, 2024
7ced787
fix versionchanged
ebonnal Oct 18, 2024
cb5f26e
📜🤖 Added by blurb_it.
blurb-it[bot] Oct 18, 2024
f46ebe6
fix tests determinism
ebonnal Oct 18, 2024
eb26e86
add test_map_with_buffersize_on_empty_iterable
ebonnal Oct 18, 2024
0821f95
allow timeout + buffersize
ebonnal Oct 18, 2024
d95c55b
lint import
ebonnal Oct 18, 2024
c80f466
tests: polish
ebonnal Oct 18, 2024
90e6d7c
rephrase docstring
ebonnal Oct 25, 2024
ab91694
fix Doc/library/concurrent.futures.rst
ebonnal Dec 3, 2024
01b8adf
reorder imports
ebonnal Dec 3, 2024
2f8a63f
rephrase buffersize's ValueError
ebonnal Dec 3, 2024
1fb53a5
update 3.14.rst
ebonnal Dec 3, 2024
365c85d
edit docstring
ebonnal Dec 3, 2024
bf5f838
lint
ebonnal Dec 3, 2024
a0057f1
lint
ebonnal Dec 3, 2024
1aa1275
comment on weakref
ebonnal Dec 4, 2024
e0a9a9e
lint
ebonnal Dec 4, 2024
8d6ea97
test_map_with_buffersize_when_buffer_becomes_full: avoid using multip…
ebonnal Dec 4, 2024
6124868
lint
ebonnal Dec 4, 2024
c11276f
test_map_with_buffersize_and_timeout: avoid sleeping 0 seconds for win32
ebonnal Dec 4, 2024
ebb5337
remove test_map_with_buffersize_and_timeoutthat does not improve cove…
ebonnal Dec 4, 2024
602968c
extend unittesting to no and multiple input iterables
ebonnal Dec 5, 2024
b14e368
Update Lib/concurrent/futures/_base.py
ebonnal Dec 5, 2024
d37ce09
rename args_iter -> zipped_iterables
ebonnal Dec 5, 2024
cdf239c
remove period at end of error message
ebonnal Dec 10, 2024
0a49784
unit tests: merge into a single test method with test messages
ebonnal Dec 16, 2024
178d6fe
apply review on tests format
ebonnal Dec 16, 2024
516a94b
Update Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d…
ebonnal Dec 16, 2024
ba4ac81
Update Doc/whatsnew/3.14.rst
ebonnal Dec 16, 2024
9588059
use assertListEqual
ebonnal Dec 16, 2024
0427bf1
test_map_buffersize_validation: test negative buffersize
ebonnal Dec 16, 2024
af88fdf
explicitly checks buffersize's type and add test_map_buffersize_type_…
ebonnal Dec 16, 2024
1fcf3fe
test_map_buffersize_on_infinite_iterable: fetch the first 4 elements
ebonnal Dec 18, 2024
0892b2b
add `test_map_buffersize_on_multiple_infinite_iterables`
ebonnal Dec 25, 2024
579ba31
doc: specify that it is the size of a buffer of tasks and not results
ebonnal Jan 10, 2025
332826a
Merge remote-tracking branch 'cpython/main' into fix-issue-29842
ebonnal Feb 24, 2025
ef814e5
Update Doc/whatsnew/3.14.rst
ebonnal Feb 24, 2025
26c8d8d
Merge branch 'main' into fix-issue-29842
encukou Mar 3, 2025
7b1d5f6
remove redundant `iter`
ebonnal Mar 3, 2025
8dac531
Merge remote-tracking branch 'cpython/main' into fix-issue-29842
ebonnal Mar 10, 2025
bb756f4
add new line in 3.14.rst
ebonnal Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ Executor Objects
future = executor.submit(pow, 323, 1235)
print(future.result())

.. method:: map(fn, *iterables, timeout=None, chunksize=1)
.. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)

Similar to :func:`map(fn, *iterables) <map>` except:

* the *iterables* are collected immediately rather than lazily;
* The *iterables* are collected immediately rather than lazily, unless a
*buffersize* is specified to limit the number of submitted tasks whose
results have not yet been yielded. If the buffer is full, iteration over
the *iterables* pauses until a result is yielded from the buffer.

* *fn* is executed asynchronously and several calls to
*fn* may be made concurrently.
Expand All @@ -68,7 +71,10 @@ Executor Objects
*chunksize* has no effect.

.. versionchanged:: 3.5
Added the *chunksize* argument.
Added the *chunksize* parameter.

.. versionchanged:: next
Added the *buffersize* parameter.

.. method:: shutdown(wait=True, *, cancel_futures=False)

Expand Down
7 changes: 7 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ contextvars
* Support context manager protocol by :class:`contextvars.Token`.
(Contributed by Andrew Svetlov in :gh:`129889`.)

* Add the optional ``buffersize`` parameter to
:meth:`concurrent.futures.Executor.map` to limit the number of submitted
tasks whose results have not yet been yielded. If the buffer is full,
iteration over the *iterables* pauses until a result is yielded from the
buffer.
(Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.)


ctypes
------
Expand Down
32 changes: 30 additions & 2 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import threading
import time
import types
import weakref
from itertools import islice

FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
Expand Down Expand Up @@ -572,7 +574,7 @@ def submit(self, fn, /, *args, **kwargs):
"""
raise NotImplementedError()

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
Expand All @@ -584,6 +586,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
buffersize: The number of submitted tasks whose results have not
yet been yielded. If the buffer is full, iteration over the
iterables pauses until a result is yielded from the buffer.
If None, all input elements are eagerly collected, and a task is
submitted for each.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
Expand All @@ -594,10 +601,25 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if buffersize is not None and not isinstance(buffersize, int):
raise TypeError("buffersize must be an integer or None")
if buffersize is not None and buffersize < 1:
raise ValueError("buffersize must be None or > 0")

if timeout is not None:
end_time = timeout + time.monotonic()

fs = [self.submit(fn, *args) for args in zip(*iterables)]
zipped_iterables = zip(*iterables)
if buffersize:
fs = collections.deque(
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize)
)
else:
fs = [self.submit(fn, *args) for args in zipped_iterables]

# Use a weak reference to ensure that the executor can be garbage
# collected independently of the result_iterator closure.
executor_weakref = weakref.ref(self)

# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
Expand All @@ -606,6 +628,12 @@ def result_iterator():
# reverse to keep finishing order
fs.reverse()
while fs:
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ebonnal I believe you got this part slightly wrong, "off-by-one". IIUC, the number of pending futures cannot be larger than buffsize. However, after the initial submission of buffsize tasks before, here in this branch you are appending an EXTRA task to the queue, and now you have buffsize + 1 tasks that have potentially not yielded yet.

Fortunately, looks like the fix is trivial: you simply have to yield first, next append to the queue:

diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index d98b1ebdd58..3b9ccf4d651 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -628,17 +628,17 @@ def result_iterator():
                 # reverse to keep finishing order
                 fs.reverse()
                 while fs:
+                    # Careful not to keep a reference to the popped future
+                    if timeout is None:
+                        yield _result_or_cancel(fs.pop())
+                    else:
+                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
                     if (
                         buffersize
                         and (executor := executor_weakref())
                         and (args := next(zipped_iterables, None))
                     ):
                         fs.appendleft(executor.submit(fn, *args))
-                    # Careful not to keep a reference to the popped future
-                    if timeout is None:
-                        yield _result_or_cancel(fs.pop())
-                    else:
-                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
             finally:
                 for future in fs:
                     future.cancel()

Copy link
Contributor Author

@ebonnal ebonnal Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dalcinl,

TL;DR: fyi we have #131467 that is open and tackling this "off-by-one" situation. Would be great to get your review there! 🙏🏻
I have not proposed this variation at first because I think it makes sense as an optional follow up PR given that it integrates slightly less smoothly into existing logic.

You will notice that it is not "just" moving the yield before the enqueue, let's see why on a simple scenario, explaining the 3 behaviors:

Scenario:

it = executor.map(fn, iterable, buffersize=buffersize)
# point A
next(it)
# point B
next(it)
# point C

"enqueue -> wait -> yield" (current, introduced by this PR) behavior

  • buffers buffersize tasks
  • at # point A there is buffersize buffered tasks
  • 1st call to next:
    • enqueue a task, jumping to buffersize+1 tasks in buffer
    • wait for the next result to be available
    • yield the next result, finally going down to buffersize tasks in buffer
  • at # point B there is still buffersize buffered tasks
  • 2nd call to next: same

pro: buffersize tasks in buffer between two calls to next
con: while waiting for the next result we have buffersize+1 tasks in buffer

"wait -> yield -> enqueue" (your proposal) behavior

  • buffers buffersize tasks
  • at # point A there is buffersize buffered tasks
  • call to next:
    • wait for the next result to be available
    • yield this result, going down to buffersize - 1 tasks in buffer
  • at # point B there is buffersize - 1 buffered tasks
  • call to next:
    • enqueue a task, jumping back to buffersize tasks in buffer
    • wait for the next result to be available
    • yield next result, going down to buffersize - 1 tasks in buffer
  • at # point C there is still buffersize-1 buffered tasks

pro: never exceed buffersize
con: between two calls to next we have only buffersize - 1 tasks in buffer

"wait -> enqueue -> yield" (#131467) behavior

  • buffers buffersize tasks
  • at # point A there is buffersize buffered tasks
  • 1st call to next:
    • wait for next result to be available
    • enqueue a task, jumping to buffersize + 1 tasks in buffer
    • yield already available result without needing to wait, going back to buffersize tasks in buffer
  • at # point B there is still buffersize buffered tasks
  • 2nd call to next: same

pros:

  • buffersize tasks in buffer between two calls to next
  • jumps to buffersize + 1 during a call to next but is back instantly to buffersize because the next result is already available when we enqueued the next task (the closest we can get to a yield-and-enqueue-at-the-same-time).

Let me know if it makes sense 🙏🏻

buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
# Careful not to keep a reference to the popped future
if timeout is None:
yield _result_or_cancel(fs.pop())
Expand Down
10 changes: 8 additions & 2 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ def submit(self, fn, /, *args, **kwargs):
return f
submit.__doc__ = _base.Executor.submit.__doc__

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
Expand All @@ -824,6 +824,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
buffersize: The number of submitted tasks whose results have not
yet been yielded. If the buffer is full, iteration over the
iterables pauses until a result is yielded from the buffer.
If None, all input elements are eagerly collected, and a task is
submitted for each.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
Expand All @@ -839,7 +844,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):

results = super().map(partial(_process_chunk, fn),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
timeout=timeout,
buffersize=buffersize)
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True, *, cancel_futures=False):
Expand Down
70 changes: 70 additions & 0 deletions Lib/test/test_concurrent_futures/executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import itertools
import threading
import time
import weakref
from concurrent import futures
from operator import add
from test import support
from test.support import Py_GIL_DISABLED

Expand Down Expand Up @@ -71,6 +73,74 @@ def test_map_timeout(self):

self.assertEqual([None, None], results)

def test_map_buffersize_type_validation(self):
for buffersize in ("foo", 2.0):
with self.subTest(buffersize=buffersize):
with self.assertRaisesRegex(
TypeError,
"buffersize must be an integer or None",
):
self.executor.map(str, range(4), buffersize=buffersize)

def test_map_buffersize_value_validation(self):
for buffersize in (0, -1):
with self.subTest(buffersize=buffersize):
with self.assertRaisesRegex(
ValueError,
"buffersize must be None or > 0",
):
self.executor.map(str, range(4), buffersize=buffersize)

def test_map_buffersize(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
with self.subTest(buffersize=buffersize):
res = self.executor.map(str, ints, buffersize=buffersize)
self.assertListEqual(list(res), ["0", "1", "2", "3"])

def test_map_buffersize_on_multiple_iterables(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
with self.subTest(buffersize=buffersize):
res = self.executor.map(add, ints, ints, buffersize=buffersize)
self.assertListEqual(list(res), [0, 2, 4, 6])

def test_map_buffersize_on_infinite_iterable(self):
res = self.executor.map(str, itertools.count(), buffersize=2)
self.assertEqual(next(res, None), "0")
self.assertEqual(next(res, None), "1")
self.assertEqual(next(res, None), "2")

def test_map_buffersize_on_multiple_infinite_iterables(self):
res = self.executor.map(
add,
itertools.count(),
itertools.count(),
buffersize=2
)
self.assertEqual(next(res, None), 0)
self.assertEqual(next(res, None), 2)
self.assertEqual(next(res, None), 4)

def test_map_buffersize_on_empty_iterable(self):
res = self.executor.map(str, [], buffersize=2)
self.assertIsNone(next(res, None))

def test_map_buffersize_without_iterable(self):
res = self.executor.map(str, buffersize=2)
self.assertIsNone(next(res, None))

def test_map_buffersize_when_buffer_is_full(self):
ints = iter(range(4))
buffersize = 2
self.executor.map(str, ints, buffersize=buffersize)
self.executor.shutdown(wait=True) # wait for tasks to complete
self.assertEqual(
next(ints),
buffersize,
msg="should have fetched only `buffersize` elements from `ints`.",
)

def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add the optional ``buffersize`` parameter to
:meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks
whose results have not yet been yielded. If the buffer is full, iteration over
the *iterables* pauses until a result is yielded from the buffer.
Loading