Skip to content

[WIP] Use Joblib backend hints rather than hardcoding #11345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build_tools/travis/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ elif [[ "$DISTRIB" == "scipy-dev-wheels" ]]; then
fi

if [[ "$COVERAGE" == "true" ]]; then
pip install coverage codecov
pip install hg+http://bitbucket.org/ogrisel/coverage.py/@fix-thread-safety#egg=coverage
pip install codecov
fi

if [[ "$TEST_DOCSTRINGS" == "true" ]]; then
Expand Down
26 changes: 14 additions & 12 deletions sklearn/ensemble/forest.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def apply(self, X):
"""
X = self._validate_X_predict(X)
results = Parallel(n_jobs=self.n_jobs, verbose=self.verbose,
backend="threading")(
prefer="threads")(
delayed(parallel_helper)(tree, 'apply', X, check_input=False)
for tree in self.estimators_)

Expand Down Expand Up @@ -206,9 +206,9 @@ def decision_path(self, X):
"""
X = self._validate_X_predict(X)
indicators = Parallel(n_jobs=self.n_jobs, verbose=self.verbose,
backend="threading")(
prefer="threads")(
delayed(parallel_helper)(tree, 'decision_path', X,
check_input=False)
check_input=False)
for tree in self.estimators_)

n_nodes = [0]
Expand All @@ -223,8 +223,8 @@ def fit(self, X, y, sample_weight=None):
Parameters
----------
X : array-like or sparse matrix of shape = [n_samples, n_features]
The training input samples. Internally, its dtype will be converted to
``dtype=np.float32``. If a sparse matrix is provided, it will be
The training input samples. Internally, its dtype will be converted
to ``dtype=np.float32``. If a sparse matrix is provided, it will be
converted into a sparse ``csc_matrix``.

y : array-like, shape = [n_samples] or [n_samples, n_outputs]
Expand Down Expand Up @@ -315,12 +315,14 @@ def fit(self, X, y, sample_weight=None):
random_state=random_state)
trees.append(tree)

# Parallel loop: we use the threading backend as the Cython code
# Parallel loop: we prefer the threading backend as the Cython code
# for fitting the trees is internally releasing the Python GIL
# making threading always more efficient than multiprocessing in
# that case.
# making threading more efficient than multiprocessing in
# that case. However, we respect any parallel_backend contexts set
# at a higher level, since correctness does not rely on using
# threads.
trees = Parallel(n_jobs=self.n_jobs, verbose=self.verbose,
backend="threading")(
prefer="threads")(
delayed(_parallel_build_trees)(
t, self, X, y, sample_weight, i, len(trees),
verbose=self.verbose, class_weight=self.class_weight)
Expand Down Expand Up @@ -367,7 +369,7 @@ def feature_importances_(self):
check_is_fitted(self, 'estimators_')

all_importances = Parallel(n_jobs=self.n_jobs,
backend="threading")(
prefer="threads")(
delayed(getattr)(tree, 'feature_importances_')
for tree in self.estimators_)

Expand Down Expand Up @@ -583,7 +585,7 @@ class in a leaf.
all_proba = [np.zeros((X.shape[0], j), dtype=np.float64)
for j in np.atleast_1d(self.n_classes_)]
lock = threading.Lock()
Parallel(n_jobs=n_jobs, verbose=self.verbose, backend="threading")(
Parallel(n_jobs=n_jobs, verbose=self.verbose, require="sharedmem")(
delayed(accumulate_prediction)(e.predict_proba, X, all_proba, lock)
for e in self.estimators_)

Expand Down Expand Up @@ -690,7 +692,7 @@ def predict(self, X):

# Parallel loop
lock = threading.Lock()
Parallel(n_jobs=n_jobs, verbose=self.verbose, backend="threading")(
Parallel(n_jobs=n_jobs, verbose=self.verbose, require="sharedmem")(
delayed(accumulate_prediction)(e.predict, X, [y_hat], lock)
for e in self.estimators_)

Expand Down
32 changes: 32 additions & 0 deletions sklearn/ensemble/tests/test_forest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import pytest

from sklearn.externals.joblib import parallel_backend
from sklearn.externals.joblib import register_parallel_backend
from sklearn.externals.joblib.parallel import LokyBackend

from sklearn.utils.testing import assert_almost_equal
from sklearn.utils.testing import assert_array_almost_equal
from sklearn.utils.testing import assert_array_equal
Expand Down Expand Up @@ -1228,3 +1232,31 @@ def test_min_impurity_decrease():
# Simply check if the parameter is passed on correctly. Tree tests
# will suffice for the actual working of this param
assert_equal(tree.min_impurity_decrease, 0.1)


class MyBackend(LokyBackend):
def __init__(self, *args, **kwargs):
self.count = 0
super(MyBackend, self).__init__(*args, **kwargs)

def start_call(self):
self.count += 1
return super(MyBackend, self).start_call()


register_parallel_backend('testing', MyBackend)


def test_backend_respected():
clf = RandomForestClassifier(n_jobs=-1)

with parallel_backend("testing") as (ba, _):
clf.fit(X, y)

assert ba.count > 0

# predict_proba requires shared memory. Ensure that's honored.
with parallel_backend("testing") as (ba, _):
clf.predict_proba(X)

assert ba.count == 0
51 changes: 22 additions & 29 deletions sklearn/externals/joblib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
"""Joblib is a set of tools to provide **lightweight pipelining in
Python**. In particular, joblib offers:
Python**. In particular:

1. transparent disk-caching of the output values and lazy re-evaluation
1. transparent disk-caching of functions and lazy re-evaluation
(memoize pattern)

2. easy simple parallel computing

3. logging and tracing of the execution

Joblib is optimized to be **fast** and **robust** in particular on large
data and has specific optimizations for `numpy` arrays. It is
**BSD-licensed**.


========================= ================================================
**User documentation:** http://pythonhosted.org/joblib
==================== ===============================================
**Documentation:** http://pythonhosted.org/joblib

**Download packages:** http://pypi.python.org/pypi/joblib#downloads
**Download:** http://pypi.python.org/pypi/joblib#downloads

**Source code:** http://github.com/joblib/joblib
**Source code:** http://github.com/joblib/joblib

**Report issues:** http://github.com/joblib/joblib/issues
========================= ================================================
**Report issues:** http://github.com/joblib/joblib/issues
==================== ===============================================


Vision
Expand All @@ -43,9 +41,8 @@
good for resuming an application status or computational job, eg
after a crash.

Joblib strives to address these problems while **leaving your code and
your flow control as unmodified as possible** (no framework, no new
paradigms).
Joblib addresses these problems while **leaving your code and your flow
control as unmodified as possible** (no framework, no new paradigms).

Main features
------------------
Expand All @@ -59,16 +56,17 @@
computation to disk and rerun it only if necessary::

>>> from sklearn.externals.joblib import Memory
>>> mem = Memory(cachedir='/tmp/joblib')
>>> cachedir = 'your_cache_dir_goes_here'
>>> mem = Memory(cachedir)
>>> import numpy as np
>>> a = np.vander(np.arange(3)).astype(np.float)
>>> square = mem.cache(np.square)
>>> b = square(a) # doctest: +ELLIPSIS
________________________________________________________________________________
[Memory] Calling square...
square(array([[ 0., 0., 1.],
[ 1., 1., 1.],
[ 4., 2., 1.]]))
square(array([[0., 0., 1.],
[1., 1., 1.],
[4., 2., 1.]]))
___________________________________________________________square - 0...s, 0.0min

>>> c = square(a)
Expand All @@ -83,19 +81,12 @@
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]


3) **Logging/tracing:** The different functionalities will
progressively acquire better logging mechanism to help track what
has been ran, and capture I/O easily. In addition, Joblib will
provide a few I/O primitives, to easily define logging and
display streams, and provide a way of compiling a report.
We want to be able to quickly inspect what has been run.

4) **Fast compressed Persistence**: a replacement for pickle to work
3) **Fast compressed Persistence**: a replacement for pickle to work
efficiently on Python objects containing large data (
*joblib.dump* & *joblib.load* ).

..
>>> import shutil ; shutil.rmtree('/tmp/joblib/')
>>> import shutil ; shutil.rmtree(cachedir)

"""

Expand All @@ -115,15 +106,16 @@
# Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer.
# 'X.Y.dev0' is the canonical version of 'X.Y.dev'
#
__version__ = '0.11'
__version__ = '0.11.1.dev0'


from .memory import Memory, MemorizedResult
from .memory import Memory, MemorizedResult, register_store_backend
from .logger import PrintTime
from .logger import Logger
from .hashing import hash
from .numpy_pickle import dump
from .numpy_pickle import load
from .compressor import register_compressor
from .parallel import Parallel
from .parallel import delayed
from .parallel import cpu_count
Expand All @@ -134,4 +126,5 @@

__all__ = ['Memory', 'MemorizedResult', 'PrintTime', 'Logger', 'hash', 'dump',
'load', 'Parallel', 'delayed', 'cpu_count', 'effective_n_jobs',
'register_parallel_backend', 'parallel_backend']
'register_parallel_backend', 'parallel_backend',
'register_store_backend', 'register_compressor']
Loading