-
-
Notifications
You must be signed in to change notification settings - Fork 31.8k
bpo-22393: Fix multiprocessing.Pool hangs if a worker process dies unexpectedly #10441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Hello, and thanks for your contribution! I'm a bot set up to make sure that the project can legally accept your contribution by verifying you have signed the PSF contributor agreement (CLA). Our records indicate we have not received your CLA. For legal reasons we need you to sign this before we can look at your contribution. Please follow the steps outlined in the CPython devguide to rectify this issue. If you have recently signed the CLA, please wait at least one business day You can check yourself to see if the CLA has been received. Thanks again for your contribution, we look forward to reviewing it! |
This PR relates to nipy#2700, and should fix the problem underlying nipy#2548. I first considered adding a control thread that monitors the `Pool` of workers, but that would require a large overhead keeping track of PIDs and polling very often. Just adding the core file of [bpo-22393](python/cpython#10441) should fix nipy#2548
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple comments, pending review from the cpython devs.
Hi @pitrou (or anyone with a say), can you give us a hint about the fate of this PR (even if you honestly think it does not have a very promising future). Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay @oesteban. I've made a couple of comments, you might want to address them.
Also, it seems you'll need to merge/rebase from master and fix any conflicts.
A Python core developer has requested some changes be made to your pull request before we can consider merging it. If you could please address their requests along with any other requests in other reviews from core developers that would be appreciated. Once you have made the requested changes, please leave a comment on this pull request containing the phrase And if you don't make the requested changes, you will be poked with soft cushions! |
I have made the requested changes; please review again |
Thanks for making the requested changes! @pitrou: please review the changes made to this pull request. |
pinging @pitrou, at least to know if the changes pointed at the right direction. |
Sorry, will take a look again. Also @pablogsal you may be interested in this. |
bumping up! |
Are there any plans for deprecating multiprocessing? Otherwise, I think this bug should be addressed. If the proposed fix is not the right way of fixing it, please let me know. I'll resolve the conflicts only once I know there is interest in doing so. Thanks very much |
@pierreglaser @tomMoral Would you like to take a look at this? |
Yes I can have a look. |
I'll have a look too. |
@pitrou thanks for the prompt response! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a first review. @tomMoral's one should land sometime next week :)
|
||
class BrokenProcessPool(RuntimeError): | ||
""" | ||
Raised when a process in a ProcessPoolExecutor terminated abruptly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe avoid using ProcessPoolExecutor
and future
terms, which are objects of the concurrent.futures
package and not the multiprocessing
package.
util.debug('terminate pool entering') | ||
is_broken = BROKEN in (task_handler._state, | ||
worker_handler._state, | ||
result_handler._state) | ||
|
||
worker_handler._state = TERMINATE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use the _worker_state_lock
here? And in other places where _worker_handler._state
is manipulated?
util.debug('helping task handler/workers to finish') | ||
cls._help_stuff_finish(inqueue, task_handler, len(pool)) | ||
else: | ||
util.debug('finishing BROKEN process pool') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens here if the task_handler
is blocked, but we do not run _help_stuff_finish
?
|
||
err = BrokenProcessPool( | ||
'A worker in the pool terminated abruptly.') | ||
# Exhaust MapResult with errors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also applies to ApplyResult
right?
err = BrokenProcessPool( | ||
'A worker in the pool terminated abruptly.') | ||
# Exhaust MapResult with errors | ||
for i, cache_ent in list(self._cache.items()): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, is there any reason why we iterate on a list of of self._cache
?
There are multiple tests being added that make use of sleep to synchronize processes (in particular it assumes that the processes will be entered in time when sleep finishes). This is very unreliable and it will most certainly fail on the slowest buildbots. Please, try to add some synchronization to the tests to make them more deterministic. |
Note that this PR, while improving the current state of the import sys
from multiprocessing import Pool
pool = Pool(2)
pool.apply(sys.exit, (0,)) or at unpickling time import sys
from multiprocessing import Pool
class Failure:
def __reduce__(self):
return sys.exit, (0, )
pool = Pool(2)
pool.apply(id, (Failure(),)) Also, many other problems exists with Maybe a more stable solution would be to actually change the |
This PR is stale because it has been open for 30 days with no activity. |
This PR fixes issue22393.
Three new unittests have been added.
https://bugs.python.org/issue22393