-
-
Notifications
You must be signed in to change notification settings - Fork 31.8k
bpo-29842: Make Executor.map less eager so it handles large/unbounded… #18566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… input iterables appropriately
…lds no reference to result at the moment it yields Reduce line lengths to PEP8 limits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi there! sorry for intruding. I was looking for this feature and decided to try this patch out. Works great! Just found a small issue so I sent you a comment.
Thank you!
except StopIteration: | ||
argsiter = None | ||
else: | ||
fs.append(self.submit(fn, *args)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the executor has been shut down, this will raise:
cannot schedule new futures after shutdown
But, also the base executor holds no state, so at this level it'll be pretty hard to tell if the executor has been shut down or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdarder
this could catch that RuntimeError and just return, or the executor could keep a weakset of result_iterators and close them on shutdown
Doc/library/concurrent.futures.rst
Outdated
.. versionchanged:: 3.5 | ||
Added the *chunksize* argument. | ||
|
||
.. versionchanged:: 3.9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. versionchanged:: 3.9 | |
.. versionchanged:: 3.11 |
While waiting for this to be included in Python, one can easily copy&paste the map function out of this version and call it with the executor as the first argument. Hoping that you get this merged soon! |
@graingert Tests are failing, can you fix them and rebase to main? I'll then review and run through the buildbots. |
Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst
Outdated
Show resolved
Hide resolved
Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
else: | ||
yield fs.pop().result(end_time - time.monotonic()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't cancel the currently waited on fut correctly: #95166
import contextlib
import functools
import concurrent.futures
import threading
import sys
def fn(num, stop_event):
if num == 1:
stop_event.wait()
return "done 1"
if num == 2:
return "done 2"
def main():
stop_event = threading.Event()
log = []
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
def print_n_wait(ident):
log.append(f"{ident=} started")
try:
stop_event.wait()
finally:
log.append(f"{ident=} stopped")
fut = pool.submit(print_n_wait, ident="first")
try:
with contextlib.closing(pool.map(print_n_wait, ["second", "third"], timeout=1)) as gen:
try:
next(gen)
except concurrent.futures.TimeoutError:
print("timed out")
else:
raise RuntimeError("timeout expected")
finally:
stop_event.set()
assert log == ["ident='first' started", "ident='first' stopped"], f"{log=} is wrong"
if __name__ == "__main__":
sys.exit(main())
@graingert You can run |
Hey all, I took the inspiration from this PR and continued the work in #114975 |
Hi, fyi here is a follow up PR: #125663 🙏🏻 |
… input iterables appropriately
bugs.python.org/issue29842
bugs.python.org/issue29842
recreate of #707 with conflicts fixed and versionchanged updated to py3.9
/cc @MojoVampire
https://bugs.python.org/issue29842