Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock hang when using asyncio await on sklearn RandomForestClassifier #1605

Open
NathanielRN opened this issue Jul 31, 2024 · 6 comments
Open

Comments

@NathanielRN
Copy link

Hey team!

I was debugging a Deadlock in our release candidate that only happens every ~3 days. I really wanted to provide some code but I'm not able to reproduce it 😞.

Our setup includes 2 Python processes which each independently use joblib.load() to load a pickled sklearn.ensemble.RandomForestClassifier.

Although it was trained with n_jobs=-1, when we actually call sklearn's predict_proba on it we hardcode the backend to be n_jobs=1. This is our final model:

RandomForestClassifier(
    bootstrap=True,
    ccp_alpha=0.0,
    class_weight=None,
    criterion='gini',
    max_depth=None,
    max_features='auto',
    max_leaf_nodes=None,
    max_samples=None,
    min_impurity_decrease=0.0,
    min_impurity_split=None,
    min_samples_leaf=1,
    min_samples_split=2,
    min_weight_fraction_leaf=0.0,
    n_estimators=200,
    n_jobs=1,
    oob_score=True,
    random_state=0,
    verbose=True,
    warm_start=False)

I read through countless issues but the top 3 interesting things I learned were these:

  1. GridSearchCV freezes indefinitely with multithreading enabled (i.e. w/ n_jobs != 1) scikit-learn/scikit-learn#5115 (comment)

Others were also seeing a hang issue with sklearn + joblib as well, and the solution was to use the threading backend. Since I don't control the joblib part of the code, I simply discovered that that sklearn will use joblibs' Parallel to execute my n_jobs=1 GitHub - scikit-learn#_forest.py#L953-L956 which will result in the SequentialBackend being selected.

if n_jobs == 1:
# Avoid unnecessary overhead and use sequential backend instead.
raise FallbackToBackend(
SequentialBackend(nesting_level=self.nesting_level))

joblib's Parallel will in turn make calls to a ‎BatchCompletionCallBack:

joblib/joblib/parallel.py

Lines 1406 to 1418 in f70939c

batch_tracker = BatchCompletionCallBack(
dispatch_timestamp, batch_size, self
)
if self.return_ordered:
self._jobs.append(batch_tracker)
# If return_ordered is False, the batch_tracker is not stored in the
# jobs queue at the time of submission. Instead, it will be appended to
# the queue by itself as soon as the callback is triggered to be able
# to return the results in the order of completion.
job = self._backend.apply_async(batch, callback=batch_tracker)

Which eventually executes my job:

joblib/joblib/parallel.py

Lines 597 to 599 in f70939c

with parallel_config(backend=self._backend, n_jobs=self._n_jobs):
return [func(*args, **kwargs)
for func, args, kwargs in self.items]

What I was surprised to see was that at this point, the self._backend has switched to the LokyBackend. I found that it's because of these 2 pieces of code:

joblib/joblib/parallel.py

Lines 1505 to 1508 in f70939c

tasks = BatchedCalls(islice[i:i + final_batch_size],
self._backend.get_nested_backend(),
self._reducer_callback,
self._pickle_cache)

# SequentialBackend should neither change the nesting level, the
# default backend or the number of jobs. Just return the current one.
return get_active_backend()

This backend concerns me because I can see it is called with self._backend_args['context'] of multiprocessing.context.ForkContext and I know a fork can hang from numpy/numpy#9248 (comment)

joblib/joblib/parallel.py

Lines 1261 to 1262 in f70939c

if DEFAULT_MP_CONTEXT is not None:
self._backend_args['context'] = DEFAULT_MP_CONTEXT

But as far as I can tell these args are completely ignored.

Either way, even if the backend is a LokyBackend at this point, the sklearn code defines a function that does not even call joblib's apply_async, so I wouldn't expect it to hang. (ref: GitHub - scikit-learn#_forest.py#L720-L733)

So I can't apply the fix in the sklearn issue, but I don't see how that could help me anyways.

  1. KeyError with joblib and sklearn cross_validate #852 (comment)

This issue talks about a Hang that was fixed by changing the dask backend to stop using a client.scatter function. This reminds me to mention that we have fixed the hang by removing all "async" and "await" keywords for the sklearn predict_proba call. That doesn't mean we've removed it from the asyncio loop. It's still there. However for some reason calling sklearn from an async context is causing a hang.

So this makes me ask, is it possible that there is another hold & wait somewhere in either the SequentialBackend and LokyBackend I mentioned? I know it doesn't sound possible, but it's my current hypothesis...

In our logs for the hang, I see Process A start the sklearn call which _always finishes in < 1 second, and next I see Process B**also** start a call to the model.Process Bcontinues without a problem, butProcess Agets stuck until 30 minutes whenProcess Bruns some totally unrelatednumpycode andProcess A` wakes up and finishes with no problem.

  1. StackOverflow - 12615525 - what-are-the-different-use-cases-of-joblib-versus-pickle

Here I learned that joblib can share memory if the numpy arrays it loads are big enough. Could that cause a hold & wait deadlock scenario?

Dependencies:

  • scikit-learn == 0.22
  • joblib == 1.2
  • platform=linux AL2 instance

Very sorry for the verbosity, but thank you very much for your help and I really appreciate any insight or ideas into this! 😃

cc @ogrisel if you have time! But please no worries if you don't 😄

@ogrisel
Copy link
Contributor

ogrisel commented Jul 31, 2024

Those are very old versions of scikit-learn and joblib. Those problems might have been fixed in the mean time but it's impossible to tell without a reproducer.

If you switch to the latest stable versions, you should be able to inspect which backend is actually used by setting the verbose setting to a large value e.g. 10 or more.

from joblib import parallel_config

parallel_config(verbose=10)

on older versions, it might be possible to get verbose output by setting the verbose parameter of the RandomForestClassifier instance instead (e.g. rf.set_params(n_jobs=1, verbose=100)).

I find it surprising that you suspect that it's using process-based parallelism because the RandomForestClassifier.predict_proba method users a Parallel(n_jobs=n_jobs, verbose=self.verbose, require="sharedmem") instance, hence, it should use threading-based backend.

Let's consider the following:

  • train_rf.py
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from joblib import dump

iris = load_iris()
X, y = iris.data, iris.target
rf = RandomForestClassifier().fit(X, y)

dump(rf, 'rf.joblib')
  • predict_proba_with_rf.py
from sklearn.datasets import load_iris
from joblib import load

iris = load_iris()

rf = load('rf.joblib')
rf.set_params(n_jobs=2, verbose=10)
rf.predict_proba(iris.data)

When I execute the above code, I get:

$ python predict_proba_with_rf.py
[Parallel(n_jobs=2)]: Using backend ThreadingBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done   9 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  14 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  21 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  28 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  37 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  46 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  57 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  68 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  81 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  94 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done 100 out of 100 | elapsed:    0.0s finished

so it's indeed using thread-based parallelism, as expected.

When I edit the script to call .set_params(n_jobs=1, verbose=10) instead, I see no backend at all (sequential execution) in recent joblib versions:

$ python predict_proba_with_rf.py
[Parallel(n_jobs=1)]: Done   1 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done   4 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done   7 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  12 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  24 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  31 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  40 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  49 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  60 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  71 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  84 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  97 tasks      | elapsed:    0.0s

To get more debugging info, you can also use faulthandler in your original code to get a full traceback after a timeout (in case of deadlock). For instance, if you expect your prediction code to always last less than 10s, you wrap the prediction code as follows:

from sklearn.datasets import load_iris
from joblib import load
from faulthandler import dump_traceback_later, cancel_dump_traceback_later

iris = load_iris()

rf = load('rf.joblib')
rf.set_params(n_jobs=2, verbose=10)

dump_traceback_later(timeout=10, exit=True)
rf.predict_proba(iris.data)
cancel_dump_traceback_later()

and report the results here.

See the official doc for details on faulthandler usage:

https://docs.python.org/3/library/faulthandler.html

@ogrisel
Copy link
Contributor

ogrisel commented Jul 31, 2024

Ideally, please provide a minimal reproducer along the lines of the code snippets I posted in the above comment but also adding the an asyncio await pattern similar to what you do in your setup.

@NathanielRN
Copy link
Author

NathanielRN commented Aug 1, 2024

Thank so much for the dump_traceback_later idea @ogrisel! That sounds likely exactly what I need. I'll deploy it to our testing stage and wait for the error to come back 🤞

For sure I can share code! I came up with this script to minimally reproduce how we are calling the model in our application. I replaced the production code with some testing code for ease of testing, but I included it just for completeness of conversation!

import asyncio
from asyncio.events import AbstractEventLoop
from typing import BinaryIO
import joblib
import numpy as np
from past.utils import old_div

from sklearn.ensemble import RandomForestClassifier
import numpy.typing as npt

model_path = "/biz/baz/my_model.pkl"


def load_pickle(fname):
    with open(fname, "rb") as fd:
        obj = joblib.load(fd) # Based on your example, this is not necessary.
    return obj


model_backend: RandomForestClassifier = load_pickle(model_path)
model_backend.n_jobs = 1 # Based on your example, I should be using `set_params` instead
clf_mean: npt.ArrayLike = model_backend.mean
clf_std: npt.ArrayLike = model_backend.std
model_backend.set_params(verbose=10)


async def predict(features: npt.ArrayLike) -> npt.ArrayLike:
    m_feat = features - clf_mean.reshape(1, -1)
    normalized_features = old_div(m_feat, clf_std.reshape(1, -1))

    prob = model_backend.predict_proba(normalized_features)
    return prob


NUM_TASKS = 15 # arbitrary
FEATURES_INPUT = np.random.rand(*(7_500, 226)) # This is the max we will see, but the hang can happen for a much smaller input size.

_semaphore = asyncio.Semaphore(5)


async def rate_limited_req():
    async with _semaphore:
        # Several other `await` calls to convert `req` into features...
        ans = await predict(FEATURES_INPUT)
        # Write `ans` back to a response Named Pipe


# def production_call_pattern(loop: AbstractEventLoop):
#     # In our system, we actually invoke the model by responding to file writes:
#     def _execute(req_pipe: BinaryIO):
#         req = req_pipe.readline()
#         asyncio.ensure_future(rate_limited_req(), loop)

#     request_pipe = open("/foo/bar.0", "rb", buffering=0)
#     loop.add_reader(request_pipe, _execute, request_pipe)
#     loop.run_forever()


def testing_call_pattern(loop: AbstractEventLoop):
    async def testing_create_tasks():
        predict_tasks = [loop.create_task(rate_limited_req()) for _ in range(NUM_TASKS)]
        [await task for task in predict_tasks]

    loop.run_until_complete(testing_create_tasks())


def main():
    loop = asyncio.get_event_loop()

    # production_call_pattern(loop)

    testing_call_pattern(loop)


main()

Which outputs many times:

[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   3 out of   3 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   4 out of   4 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   5 out of   5 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   6 out of   6 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   7 out of   7 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   8 out of   8 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   9 out of   9 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done 200 out of 200 | elapsed:    0.1s finished

Please let me know if you have any questions!

@NathanielRN
Copy link
Author

NathanielRN commented Aug 1, 2024

@ogrisel I saw the issue reproduce and got the full stack trace for it! Notice that I did a 10s timeout just to make sure it hits but I know that in normal operation we never take more than 5s to finish this input call. This is what I'm seeing:

01:06:28 `Process A` - Start the `sklearn` `predict_proba` call of size `180 x 226`.
01:06:28 `Process A` - Done the `sklearn` `predict_proba`.
01:06:28 `Process A` - Start the `sklearn` `predict_proba` call of size `180 x 172`. (Will never finish)
01:06:40 `Process B` - Start the `sklearn` `predict_proba` call of size `225 x 226`.
01:06:40 `Process B` - Done the `sklearn` `predict_proba` call.
01:06:40 `Process B` - Start the `sklearn` `predict_proba` call of size `225 x 172`.
01:06:40 `Process B` - Done the `sklearn` `predict_proba` call.

01:06:40,698 `Process A` - [Parallel(n_jobs=1)]: Done   1 tasks      &#124; elapsed:    0.0s
01:06:40,713 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.0s
01:06:40,713 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.0s
01:06:40,713 `Process A` - [Parallel(n_jobs=1)]: Done   1 tasks      &#124; elapsed:    0.0s
01:06:40,725 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.0s
01:06:40,725 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.0s
01:06:40,725 `Process A` - [Parallel(n_jobs=1)]: Done   1 tasks      &#124; elapsed:    0.0s
01:06:40,738 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.0s
01:06:40,764 `Process A` - [Parallel(n_jobs=1)]: Done   1 tasks      &#124; elapsed:    0.0s
01:06:40,890 `Process A` - Error reading from input stream
01:06:40,890 `Process A` -   File "/my_machine/lib/python3.8/site-packages/my_app_that_calls_sklearn.py", line 69 in _peri_process
01:06:40,890 `Process A` -   File "/my_machine/lib/python3.8/site-packages/sklearn/ensemble/_forest.py", line 665 in predict_proba
01:06:40,890 `Process A` -   File "/my_machine/lib/python3.8/site-packages/joblib/parallel.py", line 1918 in __call__
01:06:40,890 `Process A` -   File "/my_machine/lib/python3.8/site-packages/joblib/parallel.py", line 1849 in _get_sequential_output
01:06:40,890 `Process A` -   File "/my_machine/lib/python3.8/site-packages/joblib/parallel.py", line 1573 in print_progress
01:06:40,890 `Process A` -   File "/my_machine/lib/python3.8/site-packages/joblib/parallel.py", line 1538 in _print
01:06:40,890 `Process A` - Thread 0x0000007f956df010 (most recent call first):
01:06:40,890 `Process A` -   File "/my_machine/python3.8/lib/python3.8/threading.py", line 890 in _bootstrap
01:06:40,889 `Process A` -   File "/my_machine/python3.8/lib/python3.8/threading.py", line 932 in _bootstrap_inner
01:06:40,766 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.0s
01:06:40,766 `Process A` - [Parallel(n_jobs=1)]: Done   1 tasks      &#124; elapsed:    0.0s
01:06:40,802 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.1s
01:06:40,802 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.1s
01:06:40,802 `Process A` - [Parallel(n_jobs=1)]: Done   1 tasks      &#124; elapsed:    0.0s
01:06:40,853 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.1s
01:06:40,853 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.1s
01:06:40,859 `Process A` - [Parallel(n_jobs=1)]: Done   1 tasks      &#124; elapsed:    0.0s
01:06:40,875 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.0s
01:06:40,875 `Process A` - [Parallel(n_jobs=1)]: Done 200 tasks      &#124; elapsed:    0.0s
01:06:40,875 `Process A` - [Parallel(n_jobs=1)]: Done   1 tasks      &#124; elapsed:    0.0s
01:06:40,877 `Process A` - [ParallelTimeout (0:00:10)!
01:06:40,877 `Process A` - Thread 0x0000007f8f7301f0 (most recent call first):
01:06:40,878 `Process A` -   File "/my_machine/python3.8/lib/python3.8/threading.py", line 306 in wait
01:06:40,878 `Process A` -   File "/my_machine/python3.8/lib/python3.8/queue.py", line 179 in get
01:06:40,878 `Process A` -   File "/my_machine/lib/python3.8/site-packages/codeguru_profiler_agent/utils/execution_state.py", line 95 in _wait_for_execution_time
01:06:40,878 `Process A` -   File "/my_machine/lib/python3.8/site-packages/codeguru_profiler_agent/utils/execution_state.py", line 133 in wait_for_next_tick_or_stop
01:06:40,878 `Process A` -   File "/my_machine/lib/python3.8/site-packages/codeguru_profiler_agent/utils/scheduler.py", line 81 in _schedule_task_execution
01:06:40,878 `Process A` - [Parallel(n_jobs=1)]: Done   9 tasks      &#124; elapsed:    0.0s
01:06:44,524 `Process A` - Error reading from input stream

From what I can tell, we are stuck wait-ing for a thread to aquire a lock at this line: GitHub Repo - cpython#threading.py#L306. The stack trace says codeguru_profiler_agent/utils/execution_state.py found at GitHub Repo - codeguru_profiler_agent#execution_state.py#L95 is the one waiting for the lock.

It concerns me that the docs say:

This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.

Again, we are using n_jobs=1, so joblib will use the SequentialBackend with the Threading library which should be fine. But maybe joblib and the codeguru_profiler are both trying to acquire the same lock and getting stuck? Maybe joblib from Process B steals it for both joblib and codeguru_profiler so that's why the deadlock has joblib as the final log output in the process?

EDIT:

  1. It still doesn’t work with joblib 1.4.2, and locally it seems that SequentialBackend throws an exception for being called through async?
  2. This does happen even if remove await from the predict_proba call.

Still testing possible solutions.

@NathanielRN
Copy link
Author

NathanielRN commented Aug 15, 2024

Hey @ogrisel,

We saw this happen, and luckily the suggestion with dump_traceback_later(timeout=10, exit=True) worked. See the dumped stack trace:

07:18:26 | File "/my_app_that_calls_sklearn.py", line 105 in dump_traceback_on_timeout
07:18:26 | File "/opt/amazon/lib/python3.8/site-packages/joblib/parallel.py", line 915 in _print
07:18:26 | File "/opt/amazon/lib/python3.8/site-packages/joblib/parallel.py", line 1041 in __call__
07:18:26 | File "/opt/amazon/lib/python3.8/site-packages/sklearn/ensemble/_forest.py", line 665 in predict_proba

In joblib 1.2, we are stuck on writing to sys.stderr.write:

writer('[%s]: %s\n' % (self, msg))

This led me to an important discovery. As mentioned since 2009 in python/cpython#50970 (comment), it is known that if you fork 2 Python processes, and the 2nd fork occurred while the 1st fork was holding the lock, the 2nd fork will deadlock as soon as it uses its inherited "held" lock.

This makes sense. In my case I have 2 Python processes that both call sklearn and both use the sys.stderr.write log stream. Even though I cannot reproduce it and I haven't proven that the lock was held at the time of the 2nd fork, this is my best theory right now.

I found Delgan/loguru#231 (comment) who explains the issue really well and implemented a fix in Delgan/loguru@8c42511:

# We need to use a lock to protect sink during fork.
# Particularly, writing to stderr may lead to deadlock in child process.
lock = create_handler_lock()

If a fix like this interests the joblib team I'm happy to help, otherwise we can close this issue and hope the next developer stumbles across this issue 🙂 Please let me know if you have any follow up questions!

@NathanielRN
Copy link
Author

NathanielRN commented Aug 20, 2024

EDIT: We confirmed in production that disabling stderr logs fixed our deadlock issue once and for all!

So in reading KimiNewt/pyshark#26, we learned that by joblib writing to sys.stderr, and because Python buffers flushing stderr (StackOverflow - 22478875#7460739), our latest theory is this:

  1. joblib is trying to write to stderr but the buffer is full
  2. Python is trying to clear the stderr buffer, but its waiting for joblib to finish

We don't even care about the stderr output because our service uses the logging to flush logs. So our solution we will try is to modify the sklearn RandomForestClassifier object we unpickle:

model_backend.verbose = False

More reading:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants